1. 概述

本文将深入探讨Kafka消费者如何从Broker拉取消息。我们将学习直接影响消费者单次拉取消息数量的可配置属性,并分析调整这些参数如何改变消费者的行为模式。

2. 环境搭建

Kafka消费者按可配置大小的批次从分区拉取记录。我们无法精确配置单次拉取的记录数量,但可以配置以字节为单位的批次大小。

本文代码示例需要使用kafka-clients库的Spring应用。我们将创建一个内部使用KafkaConsumer订阅主题并记录消息的Java类。想深入学习的读者可以参考我们的Kafka Consumer API文章。

示例中的关键改动是日志记录方式:我们将收集整个批次的消息再统一记录,而非逐条记录。这样能清晰看到每次poll()拉取的消息数量,同时记录批次的首末偏移量和消费者组ID:

class VariableFetchSizeKafkaListener implements Runnable {
    private final String topic;
    private final KafkaConsumer<String, String> consumer;
    
    // 构造函数

    @Override
    public void run() {
        consumer.subscribe(singletonList(topic));
        int pollCount = 1;

        while (true) {
            List<ConsumerRecord<String, String>> records = new ArrayList<>();
            for (var record : consumer.poll(ofMillis(500))) {
                records.add(record);
            }
            if (!records.isEmpty()) {
                String batchOffsets = String.format("%s -> %s", records.get(0).offset(), records.get(records.size() - 1).offset());
                String groupId = consumer.groupMetadata().groupId();
                log.info("groupId: {}, poll: #{}, fetched: #{} records, offsets: {}", groupId, pollCount++, records.size(), batchOffsets);
            }
        }
    }
}

我们将使用Testcontainers库启动Kafka Broker的Docker容器搭建测试环境。 具体配置可参考测试环境搭建指南

我们还需要一个向指定主题发布测试数据的方法。假设向"engine.sensor.temperature"主题发送温度传感器数据:

void publishTestData(int recordsCount, String topic) {
    List<ProducerRecord<String, String>> records = IntStream.range(0, recordsCount)
      .mapToObj(__ -> new ProducerRecord<>(topic, "key1", "temperature=255F"))
      .collect(toList());
    // 发布所有消息到Kafka
}

所有消息使用相同key确保发送到同一分区,负载采用固定短文本模拟温度测量值。

3. 测试默认行为

我们先创建使用默认配置的Kafka监听器,发布几条消息观察消费批次情况。 自定义监听器内部使用Consumer API,因此需要先配置创建KafkaConsumer

Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "default_config");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

暂时使用默认的最小/最大拉取大小配置。基于此消费者实例化监听器并异步运行:

CompletableFuture.runAsync(
  new VariableFetchSizeKafkaListener(topic, kafkaConsumer)
);

阻塞测试线程数秒等待消息消费。本文重点在于观察监听器行为,因此使用JUnit5测试仅作行为探索,不包含断言。起始测试代码如下:

@Test
void whenUsingDefaultConfiguration_thenProcessInBatchesOf() throws Exception {
    String topic = "engine.sensors.temperature";
    publishTestData(300, topic);

    Properties props = new Properties();
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "default_config");
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

    CompletableFuture.runAsync(
      new VariableFetchSizeKafkaListener(topic, kafkaConsumer)
    );

    Thread.sleep(5_000L);
}

运行测试检查日志中的单次拉取记录数:

10:48:46.958 [ForkJoinPool.commonPool-worker-2] INFO  c.b.k.c.VariableFetchSizeKafkaListener - groupId: default_config, poll: #1, fetched: #300 records, offsets: 0 -> 299

✅ 所有300条记录在单批次拉取完成,因为消息体积小(key 4字符 + body 16字符 ≈ 20字节 + 元数据)。默认最大批次大小为1 MiB(1,048,576字节)。

4. 配置最大分区拉取大小

Kafka的max.partition.fetch.bytes属性决定消费者单次从单个分区拉取的最大数据量。 即使消息数量少,调整此属性也能强制分批拉取。

创建两个监听器分别设置500B和5KB的拉取大小。先提取公共配置避免重复:

Properties commonConsumerProperties() {
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return props;
}

创建第一个监听器(500B):

Properties fetchSize_500B = commonConsumerProperties();
fetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_500B");
fetchSize_500B.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500");
CompletableFuture.runAsync(
  new VariableFetchSizeKafkaListener("engine.sensors.temperature", new KafkaConsumer<>(fetchSize_500B))
);

不同消费者组ID使监听器能消费相同测试数据。添加第二个监听器(5KB)完成测试:

@Test
void whenChangingMaxPartitionFetchBytesProperty_thenAdjustBatchSizesWhilePolling() throws Exception {
    String topic = "engine.sensors.temperature";
    publishTestData(300, topic);
    
    Properties fetchSize_500B = commonConsumerProperties();
    fetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_500B");
    fetchSize_500B.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500");
    CompletableFuture.runAsync(
      new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(fetchSize_500B))
    );

    Properties fetchSize_5KB = commonConsumerProperties();
    fetchSize_5KB.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_5KB");
    fetchSize_5KB.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5000");
    CompletableFuture.runAsync(
      new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(fetchSize_5KB))
    );

    Thread.sleep(10_000L);
}

预期第一个消费者的批次大小约为第二个的十分之一。分析日志:

[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #1, fetched: #56 records, offsets: 0 -> 55
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #1, fetched: #5 records, offsets: 0 -> 4
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #2, fetched: #5 records, offsets: 5 -> 9
[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #2, fetched: #56 records, offsets: 56 -> 111
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #3, fetched: #5 records, offsets: 10 -> 14
[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #3, fetched: #56 records, offsets: 112 -> 167
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #4, fetched: #5 records, offsets: 15 -> 19
[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #4, fetched: #51 records, offsets: 168 -> 218
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #5, fetched: #5 records, offsets: 20 -> 24
[...]

⚠️ 批次记录数取决于记录大小和元数据:注意max_fetch_size_5KB消费者第四次拉取时记录数减少(51条),说明实际记录大小会影响批次内记录数量。

5. 配置最小拉取大小

Consumer API通过fetch.min.bytes属性自定义最小拉取大小。 该属性指定Broker响应的最小数据量,未达到时Broker会延迟响应。为验证效果,在测试发布方法中添加延迟:

@Test
void whenChangingMinFetchBytesProperty_thenAdjustWaitTimeWhilePolling() throws Exception {
    String topic = "engine.sensors.temperature";
    publishTestData(300, topic, 100L);  
    // ...
}

void publishTestData(int measurementsCount, String topic, long delayInMillis) {
    // ...
}

创建使用默认配置(fetch.min.bytes=1)的监听器:

// fetch.min.bytes = 1 byte (默认)
Properties minFetchSize_1B = commonConsumerProperties();
minFetchSize_1B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "min_fetch_size_1B");
CompletableFuture.runAsync(
  new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(minFetchSize_1B))
);

由于引入的延迟,预期每条记录会被单独拉取。 即大量单记录批次,消费速度与生产速度同步(每100毫秒)。运行测试分析日志:

14:23:22.368 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #1, fetched: #1 records, offsets: 0 -> 0
14:23:22.472 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #2, fetched: #1 records, offsets: 1 -> 1
14:23:22.582 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #3, fetched: #1 records, offsets: 2 -> 2
14:23:22.689 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #4, fetched: #1 records, offsets: 3 -> 3
[...]

增大fetch.min.bytes值强制消费者等待更多数据积累:

// fetch.min.bytes = 500 bytes
Properties minFetchSize_500B = commonConsumerProperties();
minFetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "mim_fetch_size_500B");
minFetchSize_500B.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "500");
CompletableFuture.runAsync(
  new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(minFetchSize_500B))
);

设置为500字节后,消费者会等待更长时间拉取更多数据。运行示例观察结果:

14:24:49.303 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #1, fetched: #6 records, offsets: 0 -> 5
14:24:49.812 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #2, fetched: #4 records, offsets: 6 -> 9
14:24:50.315 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #3, fetched: #5 records, offsets: 10 -> 14
14:24:50.819 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #4, fetched: #5 records, offsets: 15 -> 19
[...]

6. 总结

本文探讨了Kafka消费者从Broker拉取数据的机制:

  • 默认行为:有新记录即拉取,超过1,048,576字节时拆分为最大批次
  • ✅ 通过调整fetch.min.bytes控制最小拉取量,影响等待时间
  • ✅ 通过max.partition.fetch.bytes控制单分区最大拉取量,影响批次大小

合理配置这两个属性,可根据具体需求定制Kafka消费行为,避免踩坑。

示例源码见GitHub仓库


原始标题:Read Multiple Messages with Apache Kafka | Baeldung