1. 概述

Apache Kafka 是一个分布式事件流平台,能够大规模地收集、处理、存储和集成数据。在实际开发中,我们有时需要延迟处理 Kafka 中的消息。典型场景是订单处理系统:允许用户在 X 秒内取消订单,系统则延迟处理订单以适应这种需求。

本文将探讨如何使用 Spring Kafka 实现消息的延迟消费。虽然 Kafka 本身不直接支持延迟消费功能,但我们可以通过巧妙的设计实现类似效果。

2. 应用场景

Kafka 提供了多种错误重试机制,我们将利用这个机制来实现消息延迟处理。因此理解 Kafka 重试机制的工作原理至关重要。

考虑一个订单处理应用:

  • 用户通过界面下单
  • 用户可在 10 秒内取消误操作订单
  • 订单进入 Kafka 主题 web.orders,由应用处理

外部服务提供订单状态查询接口(状态包括:CREATED、ORDER_CONFIRMED、ORDER_PROCESSED、DELETED)。应用需要:

  1. 接收消息后等待 10 秒
  2. 检查外部服务确认订单状态
  3. 仅当状态为 CONFIRMED(用户未取消)时处理订单

测试场景:来自 web.orders.internal 的内部订单无需延迟处理。

定义订单模型 Order,包含生产者生成的 orderGeneratedDateTime 和消费者延迟处理后设置的 orderProcessedTime

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {

    private UUID orderId;

    private LocalDateTime orderGeneratedDateTime;

    private LocalDateTime orderProcessedTime;

    private List<String> address;

    private double price;
}

3. Kafka 监听器与外部服务

接下来我们将实现主题消费监听器和订单状态服务。

创建 @KafkaListener 监听 web.ordersweb.internal.orders 主题:

@RetryableTopic(attempts = "1", include = KafkaBackoffException.class, dltStrategy = DltStrategy.NO_DLT)
@KafkaListener(topics = { "web.orders", "web.internal.orders" }, groupId = "orders")
public void handleOrders(String order) throws JsonProcessingException {
    Order orderDetails = objectMapper.readValue(order, Order.class);
    OrderService.Status orderStatus = orderService.findStatusById(orderDetails.getOrderId());
    if (orderStatus.equals(OrderService.Status.ORDER_CONFIRMED)) {
        orderService.processOrder(orderDetails);
    }
}

⚠️ 必须包含 KafkaBackoffException 才能启用重试机制。为简化演示,外部 OrderService 总是返回 CONFIRMED 状态。processOrder() 方法设置处理时间并保存订单:

@Service
public class OrderService {

    HashMap<UUID, Order> orders = new HashMap<>();

    public Status findStatusById(UUID orderId) {
        return Status.ORDER_CONFIRMED;
    }

    public void processOrder(Order order) {
        order.setOrderProcessedTime(LocalDateTime.now());
        orders.put(order.getOrderId(), order);
    }
}

4. 自定义延迟消息监听器

Spring Kafka 提供了 KafkaBackoffAwareMessageListenerAdapter,它通过检查 dueTimestamp 头部实现延迟重试。我们将基于此实现自定义的 DelayedMessageListenerAdapter,支持按主题配置延迟时间。

关键实现要点:

  • 继承 AbstractDelegatingMessageListenerAdapter
  • 实现按主题设置延迟时间
  • 默认延迟设为 0 秒
public class DelayedMessageListenerAdapter<K, V> extends AbstractDelegatingMessageListenerAdapter<MessageListener<K, V>> 
  implements AcknowledgingConsumerAwareMessageListener<K, V> {

    // 字段声明和构造函数

    public void setDelayForTopic(String topic, Duration delay) {
        Objects.requireNonNull(topic, "Topic cannot be null");
        Objects.requireNonNull(delay, "Delay cannot be null");
        this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId));
        this.delaysPerTopic.put(topic, delay);
    }

    public void setDefaultDelay(Duration delay) {
        Objects.requireNonNull(delay, "Delay cannot be null");
        this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId));
        this.defaultDelay = delay;
    }

    @Override
    public void onMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) throws KafkaBackoffException {
        this.kafkaConsumerBackoffManager.backOffIfNecessary(createContext(consumerRecord,
          consumerRecord.timestamp() + delaysPerTopic.getOrDefault(consumerRecord.topic(), this.defaultDelay)
          .toMillis(), consumer));
        invokeDelegateOnMessage(consumerRecord, acknowledgment, consumer);
    }

    private KafkaConsumerBackoffManager.Context createContext(ConsumerRecord<K, V> data, long nextExecutionTimestamp, Consumer<?, ?> consumer) {
        return this.kafkaConsumerBackoffManager.createContext(nextExecutionTimestamp, 
          this.listenerId, 
          new TopicPartition(data.topic(), data.partition()), consumer);
    }
}

工作原理:

  1. 接收消息时检查主题配置的延迟时间
  2. 计算下次执行时间戳 = 当前时间戳 + 延迟时间
  3. 通过 KafkaConsumerBackoffManager 暂停分区或触发重试

5. 监听器配置

ConcurrentKafkaListenerContainerFactory 是 Spring Kafka 的核心组件,负责创建 KafkaListener 容器。 我们需要通过自定义配置注入 DelayedMessageListenerAdapter

配置要点:

  • web.orders 设置 10 秒延迟
  • 其他主题默认 0 秒延迟
  • 必须设置 AckMode.RECORD 确保消息可靠投递
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(ConsumerFactory<Object, Object> consumerFactory, 
  ListenerContainerRegistry registry, TaskScheduler scheduler) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    KafkaConsumerBackoffManager backOffManager = createBackOffManager(registry, scheduler);
    factory.getContainerProperties()
      .setAckMode(ContainerProperties.AckMode.RECORD);
    factory.setContainerCustomizer(container -> {
        DelayedMessageListenerAdapter<Object, Object> delayedAdapter = wrapWithDelayedMessageListenerAdapter(backOffManager, container);
        delayedAdapter.setDelayForTopic("web.orders", Duration.ofSeconds(10));
        delayedAdapter.setDefaultDelay(Duration.ZERO);
        container.setupMessageListener(delayedAdapter);
    });
    return factory;
}

@SuppressWarnings("unchecked")
private DelayedMessageListenerAdapter<Object, Object> wrapWithDelayedMessageListenerAdapter(KafkaConsumerBackoffManager backOffManager, 
  ConcurrentMessageListenerContainer<Object, Object> container) {
    return new DelayedMessageListenerAdapter<>((MessageListener<Object, Object>) container.getContainerProperties()
      .getMessageListener(), backOffManager, container.getListenerId());
}

private ContainerPartitionPausingBackOffManager createBackOffManager(ListenerContainerRegistry registry, TaskScheduler scheduler) {
    return new ContainerPartitionPausingBackOffManager(registry, 
      new ContainerPausingBackOffHandler(new ListenerContainerPauseService(registry, scheduler)));
}

✅ 关键配置:必须定义 TaskScheduler 用于延迟后恢复暂停的分区:

@Bean
public TaskScheduler taskScheduler() {
    return new ThreadPoolTaskScheduler();
}

6. 测试验证

6.1 延迟消费测试

验证 web.orders 主题的订单延迟 10 秒处理:

@Test
void givenKafkaBrokerExists_whenCreateOrderIsReceived_thenMessageShouldBeDelayed() throws Exception {
    // Given
    var orderId = UUID.randomUUID();
    Order order = Order.builder()
      .orderId(orderId)
      .price(1.0)
      .orderGeneratedDateTime(LocalDateTime.now())
      .address(List.of("41 Felix Avenue, Luton"))
      .build();

    String orderString = objectMapper.writeValueAsString(order);
    ProducerRecord<String, String> record = new ProducerRecord<>("web.orders", orderString);
    
    // When
    testKafkaProducer.send(record)
      .get();
    await().atMost(Duration.ofSeconds(1800))
      .until(() -> {
          // then
          Map<UUID, Order> orders = orderService.getOrders();
          return orders != null && orders.get(orderId) != null && Duration.between(orders.get(orderId)
              .getOrderGeneratedDateTime(), orders.get(orderId)
              .getOrderProcessedTime())
            .getSeconds() >= 10;
      });
}

6.2 即时消费测试

验证 web.internal.orders 主题的订单立即处理(延迟 ≤1 秒):

@Test
void givenKafkaBrokerExists_whenCreateOrderIsReceivedForOtherTopics_thenMessageShouldNotBeDelayed() throws Exception {
    // Given
    var orderId = UUID.randomUUID();
    Order order = Order.builder()
      .orderId(orderId)
      .price(1.0)
      .orderGeneratedDateTime(LocalDateTime.now())
      .address(List.of("41 Felix Avenue, Luton"))
      .build();

    String orderString = objectMapper.writeValueAsString(order);
    ProducerRecord<String, String> record = new ProducerRecord<>("web.internal.orders", orderString);
    
    // When
    testKafkaProducer.send(record)
      .get();
    await().atMost(Duration.ofSeconds(1800))
      .until(() -> {
          // Then
          Map<UUID, Order> orders = orderService.getOrders();
          System.out.println("Time...." + Duration.between(orders.get(orderId)
              .getOrderGeneratedDateTime(), orders.get(orderId)
              .getOrderProcessedTime())
            .getSeconds());
          return orders != null && orders.get(orderId) != null && Duration.between(orders.get(orderId)
              .getOrderGeneratedDateTime(), orders.get(orderId)
              .getOrderProcessedTime())
            .getSeconds() <= 1;
      });
}

7. 总结

本文展示了如何通过 Spring Kafka 实现固定间隔的延迟消息消费。核心思路是:

  1. 利用 Kafka 的重试机制
  2. 自定义监听器适配器控制延迟时间
  3. 按主题配置不同的延迟策略

扩展建议:可通过消息体中的时间字段实现动态延迟控制。完整示例代码可在 GitHub 获取。


原始标题:Consumer Processing of Kafka Messages With Delay