2. 实现主主题消费者
最后,我们来实现主主题的消费者代码:
@KafkaListener(topics = { "payments" }, groupId = "payments")
public void handlePayment(
Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on main topic={}, payload={}", topic, payment);
}
在深入死信主题示例前,我们先讨论重试配置。
3. 关闭重试机制
实际项目中,通常会在将事件发送到死信队列前进行重试处理。这可以通过Spring Kafka提供的非阻塞重试机制轻松实现。
但本文中,我们将关闭重试机制,重点展示死信队列的工作原理。当主主题消费者处理失败时,事件会直接进入死信队列。
3.1. 配置生产者组件
首先需要定义两个核心bean:
@Bean
public ProducerFactory<String, Payment> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new DefaultKafkaProducerFactory<>(
config, new StringSerializer(), new JsonSerializer<>());
}
@Bean
public KafkaTemplate<String, Payment> retryableTopicKafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
3.2. 定义单次重试消费者
现在我们可以定义主主题消费者,并设置重试次数为1(即不重试):
@RetryableTopic(attempts = "1", kafkaTemplate = "retryableTopicKafkaTemplate")
@KafkaListener(topics = { "payments"}, groupId = "payments")
public void handlePayment(
Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on main topic={}, payload={}", topic, payment);
}
⚠️ 关键点说明:
- 通过
attempts = "1"
实现单次尝试,失败后立即进入死信队列 - 需要配合
kafkaTemplate
参数指定消息转发模板 - 这种配置适合需要快速失败并进入死信队列的场景
✅ 优势:
- 简化错误处理流程
- 避免无效重试消耗资源
- 适合对实时性要求高的业务
❌ 注意事项:
- 网络抖动等临时性问题也会直接进入死信队列
- 需要确保死信队列消费者有完善的监控和告警机制