1. 概述

本教程将探讨如何使用Spring Kafka库中的@KafkaListener注解批量处理Kafka消息。Kafka broker作为中间件,负责持久化来自源系统的消息。目标系统被配置为定期轮询Kafka主题/队列并从中读取消息。

这种架构在目标系统或服务宕机时能有效防止消息丢失。当目标服务恢复后,它们会继续处理未消费的消息。因此,这种架构能提升消息的持久性,进而增强系统的容错能力。

2. 为什么要批量处理消息?

多个源或事件生产者同时向同一Kafka队列或主题发送消息是常见场景。这会导致大量消息在队列中堆积。如果目标服务或消费者在一次会话中接收这些海量消息,可能无法高效处理。

这会产生连锁反应,导致瓶颈问题,最终影响所有依赖这些消息的下游流程。因此,消费者或消息监听器需要限制单次处理的消息数量。

要启用批量模式,我们必须根据主题的消息量和应用处理能力配置合适的批次大小。此外,消费者应用应设计为批量处理消息以满足SLA要求。

另外,如果不采用批量处理,消费者需要频繁轮询Kafka主题来逐条获取消息。这种方式会给计算资源带来压力。因此,批量处理比每次轮询处理单条消息高效得多。

但是,批量处理在以下场景可能不适用

  • 消息量较小
  • 时间敏感型应用需要即时处理
  • 计算和内存资源受限
  • 严格的消息顺序至关重要

3. 使用@KafkaListener注解实现批量处理

为理解批量处理,我们先定义一个用例。然后分别用基本消息处理和批量处理两种方式实现。通过对比,我们能更直观地体会批量处理的重要性。

3.1. 用例描述

假设某公司数据中心运行着大量关键IT基础设施设备(如服务器和网络设备)。多个监控工具持续跟踪这些设备的KPI(关键绩效指标)。由于运维团队需要主动监控,他们期望获得实时可操作的分析数据。因此,KPI传输到目标分析应用有严格的SLA要求。

运维团队配置监控工具定期将KPI发送到Kafka主题。消费者应用从主题读取消息后推送到数据湖。数据湖应用再读取数据并生成实时分析。

我们将分别实现支持和不支持批量处理的消费者,分析两种实现的差异和效果。

3.2. 前置条件

开始实现批量处理前,理解Spring Kafka库至关重要。幸运的是,我们在Apache Kafka与Spring入门一文中已讨论过相关内容,这为我们提供了必要的知识储备。

为便于学习,我们需要一个Kafka实例。为快速启动,我们将使用嵌入式Kafka

最后,我们需要一个程序在Kafka broker中创建事件队列并定期发送示例消息。本质上,我们将使用Junit5来理解相关概念。

3.3. 基础监听器

先从基础监听器开始,它逐条从Kafka broker读取消息。我们在KafkaKpiConsumerWithNoBatchConfig配置类中定义ConcurrentKafkaListenerContainerFactory bean:

public class KafkaKpiConsumerWithNoBatchConfig {

    @Bean(name = "kafkaKpiListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaKpiBasicListenerContainerFactory(
      ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory
          = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(1);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
}

kafkaKpiBasicListenerContainerFactory()方法返回kafkaKpiListenerContainerFactory bean。该bean用于配置一次只能处理一条消息的基础监听器

@Component
public class KpiConsumer {
    private CountDownLatch latch = new CountDownLatch(1);

    private ConsumerRecord<String, String> message;
    @Autowired
    private DataLakeService dataLakeService;

    @KafkaListener(
      id = "kpi-listener",
      topics = "kpi_topic",
      containerFactory = "kafkaKpiListenerContainerFactory")
    public void listen(ConsumerRecord<String, String> record) throws InterruptedException {

        this.message = record;

        latch.await();

        List<String> messages = new ArrayList<>();
        messages.add(record.value());
        dataLakeService.save(messages);
        //reset the latch
        latch = new CountDownLatch(1);
    }
   //General getter methods
}

我们在listen()方法上应用了@KafkaListener注解。该注解用于设置监听主题和监听容器工厂bean。KpiConsumer类中的java.util.concurrent.CountDownLatch对象用于在Junit5测试中控制消息处理流程。我们将用它来理解整个概念。

CountDownLatch#await()方法会暂停监听器线程,当测试方法调用CountDownLatch#countDown()时线程恢复。没有这个机制,理解和跟踪消息会很困难。最后,下游的DataLakeService#save()方法接收到单条消息进行处理。

现在来看跟踪KpiListener类处理消息的方法:

@RepeatedTest(10)
void givenKafka_whenMessage1OnTopic_thenListenerConsumesMessages(RepetitionInfo repetitionInfo) {
    String testNo = String.valueOf(repetitionInfo.getCurrentRepetition());
    assertThat(kpiConsumer.getMessage().value()).isEqualTo("Test KPI Message-".concat(testNo));
    kpiConsumer.getLatch().countDown();
}

当监控工具将KPI消息发布到kpi_topic Kafka主题时,监听器按消息到达顺序接收它们。

每次方法执行时,都会跟踪到达KpiListener#listen()方法的消息。确认消息顺序后,释放锁存器,监听器完成处理。

3.4. 支持批量处理的监听器

现在探索Kafka的批量处理支持。首先在Spring配置类中定义ConcurrentKafkaListenerContainerFactory bean:

@Bean(name="kafkaKpiListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaKpiBatchListenerContainerFactory(
  ConsumerFactory<String, String> consumerFactory) {

    ConcurrentKafkaListenerContainerFactory<String, String> factory
      = new ConcurrentKafkaListenerContainerFactory<>();

    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "20");
    consumerFactory.updateConfigs(configProps);
    factory.setConcurrency(1);
    factory.setConsumerFactory(consumerFactory);
    factory.getContainerProperties().setPollTimeout(3000);
    factory.setBatchListener(true);

    return factory;
}

该方法与上一节定义的kafkaKpiBasicListenerContainerFactory()方法类似。我们通过调用ConsumerFactory#setBatchListener()方法启用了批量处理

此外,我们通过ConsumerConfig.MAX_POLL_RECORDS_CONFIG属性设置了每次轮询的最大消息数量。ConsumerFactory#setConcurrency()用于设置并发消费线程数,同时处理消息。其他配置可参考Spring Kafka官方文档的配置说明

另外,还有ConsumerConfig.DEFAULT_FETCH_MAX_BYTESConsumerConfig.DEFAULT_FETCH_MIN_BYTES等配置属性可用于限制消息大小

现在来看消费者代码:

@Component
public class KpiBatchConsumer {
    private CountDownLatch latch = new CountDownLatch(1);
    @Autowired
    private DataLakeService dataLakeService;
    private List<String> receivedMessages = new ArrayList<>();

    @KafkaListener(
      id = "kpi-batch-listener",
      topics = "kpi_batch_topic",
      batch = "true",
      containerFactory = "kafkaKpiListenerContainerFactory")
    public void listen(ConsumerRecords<String, String> records) throws InterruptedException {        
        records.forEach(record -> receivedMessages.add(record.value()));

        latch.await();

        dataLakeService.save(receivedMessages);
        latch = new CountDownLatch(1);
    }
    // Standard getter methods
}

KpiBatchConsumer与之前定义的KpiConsumer类类似,只是@KafkaListener注解多了一个batch属性。**listen()方法接收ConsumerRecords类型参数而非ConsumerRecord。我们可以遍历ConsumerRecords对象获取批次中的所有ConsumerRecord元素**。

监听器也可以按消息到达顺序处理批次中的消息。但在Kafka中维护跨分区的消息批次顺序比较复杂。

这里ConsumerRecord代表发布到Kafka主题的消息。最终我们调用DataLakeService#save()方法处理更多消息。CountDownLatch类的作用与之前相同。

假设有100条KPI消息被推送到kpi_batch_topic Kafka主题。现在检查监听器执行情况:

@RepeatedTest(5)
void givenKafka_whenMessagesOnTopic_thenListenerConsumesMessages() {
    int messageSize = kpiBatchConsumer.getReceivedMessages().size();

    assertThat(messageSize % 20).isEqualTo(0);
    kpiBatchConsumer.getLatch().countDown();
}

与基础监听器逐条获取消息不同,这次KpiBatchConsumer#listen()方法接收到包含20条KPI消息的批次。

4. 结论

本文讨论了基础Kafka监听器与启用批量处理的监听器之间的区别。批量处理能同时处理多条消息,提升应用性能。但合理限制批次大小和消息数量对控制应用性能至关重要。因此必须经过仔细严格的基准测试进行优化。

批量处理优势

  • 提升消息处理效率
  • 减少资源消耗
  • 适合高吞吐场景

⚠️ 注意事项

  • 需根据业务场景配置批次大小
  • 严格顺序要求时需谨慎使用
  • 资源受限环境需评估可行性

踩坑提示:批量处理虽好,但别盲目追求大批次!生产环境务必经过充分压测,否则可能适得其反。


原始标题:Consuming Messages in Batch in Kafka Using @KafkaListener | Baeldung