2. 实现主主题消费者

最后,我们来实现主主题的消费者代码:

@KafkaListener(topics = { "payments" }, groupId = "payments")
public void handlePayment(
  Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    log.info("Event on main topic={}, payload={}", topic, payment);
}

在深入死信主题示例前,我们先讨论重试配置。

3. 关闭重试机制

实际项目中,通常会在将事件发送到死信队列前进行重试处理。这可以通过Spring Kafka提供的非阻塞重试机制轻松实现。

但本文中,我们将关闭重试机制,重点展示死信队列的工作原理。当主主题消费者处理失败时,事件会直接进入死信队列。

3.1. 配置生产者组件

首先需要定义两个核心bean:

@Bean
public ProducerFactory<String, Payment> producerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    return new DefaultKafkaProducerFactory<>(
      config, new StringSerializer(), new JsonSerializer<>());
}

@Bean
public KafkaTemplate<String, Payment> retryableTopicKafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

3.2. 定义单次重试消费者

现在我们可以定义主主题消费者,并设置重试次数为1(即不重试):

@RetryableTopic(attempts = "1", kafkaTemplate = "retryableTopicKafkaTemplate")
@KafkaListener(topics = { "payments"}, groupId = "payments")
public void handlePayment(
  Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    log.info("Event on main topic={}, payload={}", topic, payment);
}

⚠️ 关键点说明

  • 通过attempts = "1"实现单次尝试,失败后立即进入死信队列
  • 需要配合kafkaTemplate参数指定消息转发模板
  • 这种配置适合需要快速失败并进入死信队列的场景

优势

  • 简化错误处理流程
  • 避免无效重试消耗资源
  • 适合对实时性要求高的业务

注意事项

  • 网络抖动等临时性问题也会直接进入死信队列
  • 需要确保死信队列消费者有完善的监控和告警机制

原始标题:Dead Letter Queue for Kafka With Spring