1. 概述
本教程将探讨如何使用Spring Kafka库中的@KafkaListener
注解批量处理Kafka消息。Kafka broker作为中间件,负责持久化来自源系统的消息。目标系统被配置为定期轮询Kafka主题/队列并从中读取消息。
这种架构在目标系统或服务宕机时能有效防止消息丢失。当目标服务恢复后,它们会继续处理未消费的消息。因此,这种架构能提升消息的持久性,进而增强系统的容错能力。
2. 为什么要批量处理消息?
多个源或事件生产者同时向同一Kafka队列或主题发送消息是常见场景。这会导致大量消息在队列中堆积。如果目标服务或消费者在一次会话中接收这些海量消息,可能无法高效处理。
这会产生连锁反应,导致瓶颈问题,最终影响所有依赖这些消息的下游流程。因此,消费者或消息监听器需要限制单次处理的消息数量。
要启用批量模式,我们必须根据主题的消息量和应用处理能力配置合适的批次大小。此外,消费者应用应设计为批量处理消息以满足SLA要求。
另外,如果不采用批量处理,消费者需要频繁轮询Kafka主题来逐条获取消息。这种方式会给计算资源带来压力。因此,批量处理比每次轮询处理单条消息高效得多。
但是,批量处理在以下场景可能不适用:
- 消息量较小
- 时间敏感型应用需要即时处理
- 计算和内存资源受限
- 严格的消息顺序至关重要
3. 使用@KafkaListener注解实现批量处理
为理解批量处理,我们先定义一个用例。然后分别用基本消息处理和批量处理两种方式实现。通过对比,我们能更直观地体会批量处理的重要性。
3.1. 用例描述
假设某公司数据中心运行着大量关键IT基础设施设备(如服务器和网络设备)。多个监控工具持续跟踪这些设备的KPI(关键绩效指标)。由于运维团队需要主动监控,他们期望获得实时可操作的分析数据。因此,KPI传输到目标分析应用有严格的SLA要求。
运维团队配置监控工具定期将KPI发送到Kafka主题。消费者应用从主题读取消息后推送到数据湖。数据湖应用再读取数据并生成实时分析。
我们将分别实现支持和不支持批量处理的消费者,分析两种实现的差异和效果。
3.2. 前置条件
开始实现批量处理前,理解Spring Kafka库至关重要。幸运的是,我们在Apache Kafka与Spring入门一文中已讨论过相关内容,这为我们提供了必要的知识储备。
为便于学习,我们需要一个Kafka实例。为快速启动,我们将使用嵌入式Kafka。
最后,我们需要一个程序在Kafka broker中创建事件队列并定期发送示例消息。本质上,我们将使用Junit5来理解相关概念。
3.3. 基础监听器
先从基础监听器开始,它逐条从Kafka broker读取消息。我们在KafkaKpiConsumerWithNoBatchConfig
配置类中定义ConcurrentKafkaListenerContainerFactory
bean:
public class KafkaKpiConsumerWithNoBatchConfig {
@Bean(name = "kafkaKpiListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaKpiBasicListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(1);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
}
kafkaKpiBasicListenerContainerFactory()方法返回kafkaKpiListenerContainerFactory bean。该bean用于配置一次只能处理一条消息的基础监听器:
@Component
public class KpiConsumer {
private CountDownLatch latch = new CountDownLatch(1);
private ConsumerRecord<String, String> message;
@Autowired
private DataLakeService dataLakeService;
@KafkaListener(
id = "kpi-listener",
topics = "kpi_topic",
containerFactory = "kafkaKpiListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record) throws InterruptedException {
this.message = record;
latch.await();
List<String> messages = new ArrayList<>();
messages.add(record.value());
dataLakeService.save(messages);
//reset the latch
latch = new CountDownLatch(1);
}
//General getter methods
}
我们在listen()
方法上应用了@KafkaListener
注解。该注解用于设置监听主题和监听容器工厂bean。KpiConsumer
类中的java.util.concurrent.CountDownLatch
对象用于在Junit5测试中控制消息处理流程。我们将用它来理解整个概念。
CountDownLatch#await()
方法会暂停监听器线程,当测试方法调用CountDownLatch#countDown()
时线程恢复。没有这个机制,理解和跟踪消息会很困难。最后,下游的DataLakeService#save()
方法接收到单条消息进行处理。
现在来看跟踪KpiListener
类处理消息的方法:
@RepeatedTest(10)
void givenKafka_whenMessage1OnTopic_thenListenerConsumesMessages(RepetitionInfo repetitionInfo) {
String testNo = String.valueOf(repetitionInfo.getCurrentRepetition());
assertThat(kpiConsumer.getMessage().value()).isEqualTo("Test KPI Message-".concat(testNo));
kpiConsumer.getLatch().countDown();
}
当监控工具将KPI消息发布到kpi_topic Kafka主题时,监听器按消息到达顺序接收它们。
每次方法执行时,都会跟踪到达KpiListener#listen()
方法的消息。确认消息顺序后,释放锁存器,监听器完成处理。
3.4. 支持批量处理的监听器
现在探索Kafka的批量处理支持。首先在Spring配置类中定义ConcurrentKafkaListenerContainerFactory
bean:
@Bean(name="kafkaKpiListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaKpiBatchListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory<>();
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "20");
consumerFactory.updateConfigs(configProps);
factory.setConcurrency(1);
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setPollTimeout(3000);
factory.setBatchListener(true);
return factory;
}
该方法与上一节定义的kafkaKpiBasicListenerContainerFactory()
方法类似。我们通过调用ConsumerFactory#setBatchListener()
方法启用了批量处理。
此外,我们通过ConsumerConfig.MAX_POLL_RECORDS_CONFIG
属性设置了每次轮询的最大消息数量。ConsumerFactory#setConcurrency()
用于设置并发消费线程数,同时处理消息。其他配置可参考Spring Kafka官方文档的配置说明。
另外,还有ConsumerConfig.DEFAULT_FETCH_MAX_BYTES
和ConsumerConfig.DEFAULT_FETCH_MIN_BYTES
等配置属性可用于限制消息大小。
现在来看消费者代码:
@Component
public class KpiBatchConsumer {
private CountDownLatch latch = new CountDownLatch(1);
@Autowired
private DataLakeService dataLakeService;
private List<String> receivedMessages = new ArrayList<>();
@KafkaListener(
id = "kpi-batch-listener",
topics = "kpi_batch_topic",
batch = "true",
containerFactory = "kafkaKpiListenerContainerFactory")
public void listen(ConsumerRecords<String, String> records) throws InterruptedException {
records.forEach(record -> receivedMessages.add(record.value()));
latch.await();
dataLakeService.save(receivedMessages);
latch = new CountDownLatch(1);
}
// Standard getter methods
}
KpiBatchConsumer
与之前定义的KpiConsumer
类类似,只是@KafkaListener
注解多了一个batch
属性。**listen()
方法接收ConsumerRecords
类型参数而非ConsumerRecord
。我们可以遍历ConsumerRecords
对象获取批次中的所有ConsumerRecord
元素**。
监听器也可以按消息到达顺序处理批次中的消息。但在Kafka中维护跨分区的消息批次顺序比较复杂。
这里ConsumerRecord
代表发布到Kafka主题的消息。最终我们调用DataLakeService#save()
方法处理更多消息。CountDownLatch
类的作用与之前相同。
假设有100条KPI消息被推送到kpi_batch_topic
Kafka主题。现在检查监听器执行情况:
@RepeatedTest(5)
void givenKafka_whenMessagesOnTopic_thenListenerConsumesMessages() {
int messageSize = kpiBatchConsumer.getReceivedMessages().size();
assertThat(messageSize % 20).isEqualTo(0);
kpiBatchConsumer.getLatch().countDown();
}
与基础监听器逐条获取消息不同,这次KpiBatchConsumer#listen()
方法接收到包含20条KPI消息的批次。
4. 结论
本文讨论了基础Kafka监听器与启用批量处理的监听器之间的区别。批量处理能同时处理多条消息,提升应用性能。但合理限制批次大小和消息数量对控制应用性能至关重要。因此必须经过仔细严格的基准测试进行优化。
✅ 批量处理优势:
- 提升消息处理效率
- 减少资源消耗
- 适合高吞吐场景
⚠️ 注意事项:
- 需根据业务场景配置批次大小
- 严格顺序要求时需谨慎使用
- 资源受限环境需评估可行性
踩坑提示:批量处理虽好,但别盲目追求大批次!生产环境务必经过充分压测,否则可能适得其反。