1. 概述
Apache Kafka 是一个分布式事件流平台,能够大规模地收集、处理、存储和集成数据。在实际开发中,我们有时需要延迟处理 Kafka 中的消息。典型场景是订单处理系统:允许用户在 X 秒内取消订单,系统则延迟处理订单以适应这种需求。
本文将探讨如何使用 Spring Kafka 实现消息的延迟消费。虽然 Kafka 本身不直接支持延迟消费功能,但我们可以通过巧妙的设计实现类似效果。
2. 应用场景
Kafka 提供了多种错误重试机制,我们将利用这个机制来实现消息延迟处理。因此理解 Kafka 重试机制的工作原理至关重要。
考虑一个订单处理应用:
- 用户通过界面下单
- 用户可在 10 秒内取消误操作订单
- 订单进入 Kafka 主题
web.orders
,由应用处理
外部服务提供订单状态查询接口(状态包括:CREATED、ORDER_CONFIRMED、ORDER_PROCESSED、DELETED)。应用需要:
- 接收消息后等待 10 秒
- 检查外部服务确认订单状态
- 仅当状态为 CONFIRMED(用户未取消)时处理订单
测试场景:来自 web.orders.internal
的内部订单无需延迟处理。
定义订单模型 Order
,包含生产者生成的 orderGeneratedDateTime
和消费者延迟处理后设置的 orderProcessedTime
:
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {
private UUID orderId;
private LocalDateTime orderGeneratedDateTime;
private LocalDateTime orderProcessedTime;
private List<String> address;
private double price;
}
3. Kafka 监听器与外部服务
接下来我们将实现主题消费监听器和订单状态服务。
创建 @KafkaListener
监听 web.orders
和 web.internal.orders
主题:
@RetryableTopic(attempts = "1", include = KafkaBackoffException.class, dltStrategy = DltStrategy.NO_DLT)
@KafkaListener(topics = { "web.orders", "web.internal.orders" }, groupId = "orders")
public void handleOrders(String order) throws JsonProcessingException {
Order orderDetails = objectMapper.readValue(order, Order.class);
OrderService.Status orderStatus = orderService.findStatusById(orderDetails.getOrderId());
if (orderStatus.equals(OrderService.Status.ORDER_CONFIRMED)) {
orderService.processOrder(orderDetails);
}
}
⚠️ 必须包含 KafkaBackoffException
才能启用重试机制。为简化演示,外部 OrderService
总是返回 CONFIRMED 状态。processOrder()
方法设置处理时间并保存订单:
@Service
public class OrderService {
HashMap<UUID, Order> orders = new HashMap<>();
public Status findStatusById(UUID orderId) {
return Status.ORDER_CONFIRMED;
}
public void processOrder(Order order) {
order.setOrderProcessedTime(LocalDateTime.now());
orders.put(order.getOrderId(), order);
}
}
4. 自定义延迟消息监听器
Spring Kafka 提供了 KafkaBackoffAwareMessageListenerAdapter
,它通过检查 dueTimestamp
头部实现延迟重试。我们将基于此实现自定义的 DelayedMessageListenerAdapter
,支持按主题配置延迟时间。
关键实现要点:
- 继承
AbstractDelegatingMessageListenerAdapter
- 实现按主题设置延迟时间
- 默认延迟设为 0 秒
public class DelayedMessageListenerAdapter<K, V> extends AbstractDelegatingMessageListenerAdapter<MessageListener<K, V>>
implements AcknowledgingConsumerAwareMessageListener<K, V> {
// 字段声明和构造函数
public void setDelayForTopic(String topic, Duration delay) {
Objects.requireNonNull(topic, "Topic cannot be null");
Objects.requireNonNull(delay, "Delay cannot be null");
this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId));
this.delaysPerTopic.put(topic, delay);
}
public void setDefaultDelay(Duration delay) {
Objects.requireNonNull(delay, "Delay cannot be null");
this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId));
this.defaultDelay = delay;
}
@Override
public void onMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) throws KafkaBackoffException {
this.kafkaConsumerBackoffManager.backOffIfNecessary(createContext(consumerRecord,
consumerRecord.timestamp() + delaysPerTopic.getOrDefault(consumerRecord.topic(), this.defaultDelay)
.toMillis(), consumer));
invokeDelegateOnMessage(consumerRecord, acknowledgment, consumer);
}
private KafkaConsumerBackoffManager.Context createContext(ConsumerRecord<K, V> data, long nextExecutionTimestamp, Consumer<?, ?> consumer) {
return this.kafkaConsumerBackoffManager.createContext(nextExecutionTimestamp,
this.listenerId,
new TopicPartition(data.topic(), data.partition()), consumer);
}
}
工作原理:
- 接收消息时检查主题配置的延迟时间
- 计算下次执行时间戳 = 当前时间戳 + 延迟时间
- 通过
KafkaConsumerBackoffManager
暂停分区或触发重试
5. 监听器配置
ConcurrentKafkaListenerContainerFactory
是 Spring Kafka 的核心组件,负责创建 KafkaListener
容器。 我们需要通过自定义配置注入 DelayedMessageListenerAdapter
。
配置要点:
- 为
web.orders
设置 10 秒延迟 - 其他主题默认 0 秒延迟
- 必须设置
AckMode.RECORD
确保消息可靠投递
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(ConsumerFactory<Object, Object> consumerFactory,
ListenerContainerRegistry registry, TaskScheduler scheduler) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
KafkaConsumerBackoffManager backOffManager = createBackOffManager(registry, scheduler);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.RECORD);
factory.setContainerCustomizer(container -> {
DelayedMessageListenerAdapter<Object, Object> delayedAdapter = wrapWithDelayedMessageListenerAdapter(backOffManager, container);
delayedAdapter.setDelayForTopic("web.orders", Duration.ofSeconds(10));
delayedAdapter.setDefaultDelay(Duration.ZERO);
container.setupMessageListener(delayedAdapter);
});
return factory;
}
@SuppressWarnings("unchecked")
private DelayedMessageListenerAdapter<Object, Object> wrapWithDelayedMessageListenerAdapter(KafkaConsumerBackoffManager backOffManager,
ConcurrentMessageListenerContainer<Object, Object> container) {
return new DelayedMessageListenerAdapter<>((MessageListener<Object, Object>) container.getContainerProperties()
.getMessageListener(), backOffManager, container.getListenerId());
}
private ContainerPartitionPausingBackOffManager createBackOffManager(ListenerContainerRegistry registry, TaskScheduler scheduler) {
return new ContainerPartitionPausingBackOffManager(registry,
new ContainerPausingBackOffHandler(new ListenerContainerPauseService(registry, scheduler)));
}
✅ 关键配置:必须定义 TaskScheduler
用于延迟后恢复暂停的分区:
@Bean
public TaskScheduler taskScheduler() {
return new ThreadPoolTaskScheduler();
}
6. 测试验证
6.1 延迟消费测试
验证 web.orders
主题的订单延迟 10 秒处理:
@Test
void givenKafkaBrokerExists_whenCreateOrderIsReceived_thenMessageShouldBeDelayed() throws Exception {
// Given
var orderId = UUID.randomUUID();
Order order = Order.builder()
.orderId(orderId)
.price(1.0)
.orderGeneratedDateTime(LocalDateTime.now())
.address(List.of("41 Felix Avenue, Luton"))
.build();
String orderString = objectMapper.writeValueAsString(order);
ProducerRecord<String, String> record = new ProducerRecord<>("web.orders", orderString);
// When
testKafkaProducer.send(record)
.get();
await().atMost(Duration.ofSeconds(1800))
.until(() -> {
// then
Map<UUID, Order> orders = orderService.getOrders();
return orders != null && orders.get(orderId) != null && Duration.between(orders.get(orderId)
.getOrderGeneratedDateTime(), orders.get(orderId)
.getOrderProcessedTime())
.getSeconds() >= 10;
});
}
6.2 即时消费测试
验证 web.internal.orders
主题的订单立即处理(延迟 ≤1 秒):
@Test
void givenKafkaBrokerExists_whenCreateOrderIsReceivedForOtherTopics_thenMessageShouldNotBeDelayed() throws Exception {
// Given
var orderId = UUID.randomUUID();
Order order = Order.builder()
.orderId(orderId)
.price(1.0)
.orderGeneratedDateTime(LocalDateTime.now())
.address(List.of("41 Felix Avenue, Luton"))
.build();
String orderString = objectMapper.writeValueAsString(order);
ProducerRecord<String, String> record = new ProducerRecord<>("web.internal.orders", orderString);
// When
testKafkaProducer.send(record)
.get();
await().atMost(Duration.ofSeconds(1800))
.until(() -> {
// Then
Map<UUID, Order> orders = orderService.getOrders();
System.out.println("Time...." + Duration.between(orders.get(orderId)
.getOrderGeneratedDateTime(), orders.get(orderId)
.getOrderProcessedTime())
.getSeconds());
return orders != null && orders.get(orderId) != null && Duration.between(orders.get(orderId)
.getOrderGeneratedDateTime(), orders.get(orderId)
.getOrderProcessedTime())
.getSeconds() <= 1;
});
}
7. 总结
本文展示了如何通过 Spring Kafka 实现固定间隔的延迟消息消费。核心思路是:
- 利用 Kafka 的重试机制
- 自定义监听器适配器控制延迟时间
- 按主题配置不同的延迟策略
扩展建议:可通过消息体中的时间字段实现动态延迟控制。完整示例代码可在 GitHub 获取。