1. 概述
本文将深入探讨Kafka消费者如何从Broker拉取消息。我们将学习直接影响消费者单次拉取消息数量的可配置属性,并分析调整这些参数如何改变消费者的行为模式。
2. 环境搭建
Kafka消费者按可配置大小的批次从分区拉取记录。我们无法精确配置单次拉取的记录数量,但可以配置以字节为单位的批次大小。
本文代码示例需要使用kafka-clients库的Spring应用。我们将创建一个内部使用KafkaConsumer
订阅主题并记录消息的Java类。想深入学习的读者可以参考我们的Kafka Consumer API文章。
示例中的关键改动是日志记录方式:我们将收集整个批次的消息再统一记录,而非逐条记录。这样能清晰看到每次poll()
拉取的消息数量,同时记录批次的首末偏移量和消费者组ID:
class VariableFetchSizeKafkaListener implements Runnable {
private final String topic;
private final KafkaConsumer<String, String> consumer;
// 构造函数
@Override
public void run() {
consumer.subscribe(singletonList(topic));
int pollCount = 1;
while (true) {
List<ConsumerRecord<String, String>> records = new ArrayList<>();
for (var record : consumer.poll(ofMillis(500))) {
records.add(record);
}
if (!records.isEmpty()) {
String batchOffsets = String.format("%s -> %s", records.get(0).offset(), records.get(records.size() - 1).offset());
String groupId = consumer.groupMetadata().groupId();
log.info("groupId: {}, poll: #{}, fetched: #{} records, offsets: {}", groupId, pollCount++, records.size(), batchOffsets);
}
}
}
}
我们将使用Testcontainers库启动Kafka Broker的Docker容器搭建测试环境。 具体配置可参考测试环境搭建指南。
我们还需要一个向指定主题发布测试数据的方法。假设向"engine.sensor.temperature"主题发送温度传感器数据:
void publishTestData(int recordsCount, String topic) {
List<ProducerRecord<String, String>> records = IntStream.range(0, recordsCount)
.mapToObj(__ -> new ProducerRecord<>(topic, "key1", "temperature=255F"))
.collect(toList());
// 发布所有消息到Kafka
}
所有消息使用相同key确保发送到同一分区,负载采用固定短文本模拟温度测量值。
3. 测试默认行为
我们先创建使用默认配置的Kafka监听器,发布几条消息观察消费批次情况。 自定义监听器内部使用Consumer API,因此需要先配置创建KafkaConsumer
:
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "default_config");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
暂时使用默认的最小/最大拉取大小配置。基于此消费者实例化监听器并异步运行:
CompletableFuture.runAsync(
new VariableFetchSizeKafkaListener(topic, kafkaConsumer)
);
阻塞测试线程数秒等待消息消费。本文重点在于观察监听器行为,因此使用JUnit5测试仅作行为探索,不包含断言。起始测试代码如下:
@Test
void whenUsingDefaultConfiguration_thenProcessInBatchesOf() throws Exception {
String topic = "engine.sensors.temperature";
publishTestData(300, topic);
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "default_config");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
CompletableFuture.runAsync(
new VariableFetchSizeKafkaListener(topic, kafkaConsumer)
);
Thread.sleep(5_000L);
}
运行测试检查日志中的单次拉取记录数:
10:48:46.958 [ForkJoinPool.commonPool-worker-2] INFO c.b.k.c.VariableFetchSizeKafkaListener - groupId: default_config, poll: #1, fetched: #300 records, offsets: 0 -> 299
✅ 所有300条记录在单批次拉取完成,因为消息体积小(key 4字符 + body 16字符 ≈ 20字节 + 元数据)。默认最大批次大小为1 MiB(1,048,576字节)。
4. 配置最大分区拉取大小
Kafka的max.partition.fetch.bytes
属性决定消费者单次从单个分区拉取的最大数据量。 即使消息数量少,调整此属性也能强制分批拉取。
创建两个监听器分别设置500B和5KB的拉取大小。先提取公共配置避免重复:
Properties commonConsumerProperties() {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return props;
}
创建第一个监听器(500B):
Properties fetchSize_500B = commonConsumerProperties();
fetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_500B");
fetchSize_500B.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500");
CompletableFuture.runAsync(
new VariableFetchSizeKafkaListener("engine.sensors.temperature", new KafkaConsumer<>(fetchSize_500B))
);
不同消费者组ID使监听器能消费相同测试数据。添加第二个监听器(5KB)完成测试:
@Test
void whenChangingMaxPartitionFetchBytesProperty_thenAdjustBatchSizesWhilePolling() throws Exception {
String topic = "engine.sensors.temperature";
publishTestData(300, topic);
Properties fetchSize_500B = commonConsumerProperties();
fetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_500B");
fetchSize_500B.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500");
CompletableFuture.runAsync(
new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(fetchSize_500B))
);
Properties fetchSize_5KB = commonConsumerProperties();
fetchSize_5KB.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_5KB");
fetchSize_5KB.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5000");
CompletableFuture.runAsync(
new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(fetchSize_5KB))
);
Thread.sleep(10_000L);
}
预期第一个消费者的批次大小约为第二个的十分之一。分析日志:
[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #1, fetched: #56 records, offsets: 0 -> 55
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #1, fetched: #5 records, offsets: 0 -> 4
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #2, fetched: #5 records, offsets: 5 -> 9
[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #2, fetched: #56 records, offsets: 56 -> 111
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #3, fetched: #5 records, offsets: 10 -> 14
[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #3, fetched: #56 records, offsets: 112 -> 167
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #4, fetched: #5 records, offsets: 15 -> 19
[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #4, fetched: #51 records, offsets: 168 -> 218
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #5, fetched: #5 records, offsets: 20 -> 24
[...]
⚠️ 批次记录数取决于记录大小和元数据:注意max_fetch_size_5KB
消费者第四次拉取时记录数减少(51条),说明实际记录大小会影响批次内记录数量。
5. 配置最小拉取大小
Consumer API通过fetch.min.bytes
属性自定义最小拉取大小。 该属性指定Broker响应的最小数据量,未达到时Broker会延迟响应。为验证效果,在测试发布方法中添加延迟:
@Test
void whenChangingMinFetchBytesProperty_thenAdjustWaitTimeWhilePolling() throws Exception {
String topic = "engine.sensors.temperature";
publishTestData(300, topic, 100L);
// ...
}
void publishTestData(int measurementsCount, String topic, long delayInMillis) {
// ...
}
创建使用默认配置(fetch.min.bytes=1
)的监听器:
// fetch.min.bytes = 1 byte (默认)
Properties minFetchSize_1B = commonConsumerProperties();
minFetchSize_1B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "min_fetch_size_1B");
CompletableFuture.runAsync(
new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(minFetchSize_1B))
);
由于引入的延迟,预期每条记录会被单独拉取。 即大量单记录批次,消费速度与生产速度同步(每100毫秒)。运行测试分析日志:
14:23:22.368 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #1, fetched: #1 records, offsets: 0 -> 0
14:23:22.472 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #2, fetched: #1 records, offsets: 1 -> 1
14:23:22.582 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #3, fetched: #1 records, offsets: 2 -> 2
14:23:22.689 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #4, fetched: #1 records, offsets: 3 -> 3
[...]
增大fetch.min.bytes
值强制消费者等待更多数据积累:
// fetch.min.bytes = 500 bytes
Properties minFetchSize_500B = commonConsumerProperties();
minFetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "mim_fetch_size_500B");
minFetchSize_500B.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "500");
CompletableFuture.runAsync(
new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(minFetchSize_500B))
);
设置为500字节后,消费者会等待更长时间拉取更多数据。运行示例观察结果:
14:24:49.303 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #1, fetched: #6 records, offsets: 0 -> 5
14:24:49.812 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #2, fetched: #4 records, offsets: 6 -> 9
14:24:50.315 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #3, fetched: #5 records, offsets: 10 -> 14
14:24:50.819 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #4, fetched: #5 records, offsets: 15 -> 19
[...]
6. 总结
本文探讨了Kafka消费者从Broker拉取数据的机制:
- 默认行为:有新记录即拉取,超过1,048,576字节时拆分为最大批次
- ✅ 通过调整
fetch.min.bytes
控制最小拉取量,影响等待时间 - ✅ 通过
max.partition.fetch.bytes
控制单分区最大拉取量,影响批次大小
合理配置这两个属性,可根据具体需求定制Kafka消费行为,避免踩坑。
示例源码见GitHub仓库