1. 引言
消费者组通过允许多个消费者从同一主题读取数据,帮助构建更具扩展性的Kafka应用。本教程将深入理解消费者组及其在消费者间重平衡分区的机制。
2. 什么是消费者组?
消费者组是与一个或多个主题关联的一组唯一消费者。每个消费者可读取零个、一个或多个分区,但每个分区在特定时间点只能分配给一个消费者。当组成员变化时,分区分配会随之改变,这个过程称为组重平衡。
消费者组是Kafka应用的核心组件,它通过将相似消费者分组,实现从分区主题的并行读取,从而提升应用性能和可扩展性。
2.1. 组协调器和组领导者
创建消费者组时,Kafka会同时创建组协调器。协调器定期接收来自消费者的心跳请求。若消费者停止发送心跳,协调器会认为该消费者已离开组或崩溃,这会触发分区重平衡。
第一个请求加入组的消费者成为组领导者。当重平衡发生时,领导者从协调器获取成员列表,然后根据partition.assignment.strategy
配置的自定义策略重新分配分区。
2.2. 已提交偏移量
Kafka使用已提交偏移量跟踪主题的最后读取位置。该偏移量表示消费者成功处理消息的位置,也是后续读取的起点。
所有分区的已提交偏移量存储在名为__consumer_offsets
的内部主题中。由于主题具有持久性和容错性,我们可以信任其存储的信息。
2.3. 分区重平衡
分区重平衡会改变分区的所有权。当新消费者加入组、现有消费者崩溃或主动退出时,Kafka会自动执行重平衡:
- ✅ 新消费者加入:Kafka公平地将其他消费者的部分分区分配给新成员
- ⚠️ 消费者崩溃:其分区必须重新分配给组内剩余消费者,避免消息丢失
重平衡期间,消费者无法消费消息,导致服务短暂不可用。同时,消费者会丢失状态并需重新计算缓存值,这会降低消息处理速度。
3. 应用程序设置
本节将配置Spring Kafka应用的基础组件。
3.1. 创建基本配置
首先配置主题及其分区:
@Configuration
public class KafkaTopicConfiguration {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
public NewTopic celciusTopic() {
return TopicBuilder.name("topic-1")
.partitions(2)
.build();
}
}
接着配置生产者:
@Configuration
public class KafkaProducerConfiguration {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, Double> kafkaProducer() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DoubleSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Double> kafkaProducerTemplate() {
return new KafkaTemplate<>(kafkaProducer());
}
}
最后配置消费者:
@Configuration
public class KafkaConsumerConfiguration {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ConsumerFactory<String, Double> kafkaConsumer() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DoubleDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Double> kafkaConsumerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Double> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumer());
return factory;
}
}
3.2. 设置消费者
创建两个属于group-1
的消费者:
@Service
public class MessageConsumerService {
@KafkaListener(topics = "topic-1", groupId = "group-1")
public void consumer0(ConsumerRecord<?, ?> consumerRecord) {
trackConsumedPartitions("consumer-0", consumerRecord);
}
@KafkaListener(topics = "topic-1", groupId = "group-1")
public void consumer1(ConsumerRecord<?, ?> consumerRecord) {
trackConsumedPartitions("consumer-1", consumerRecord);
}
}
添加分区跟踪逻辑:
Map<String, Set<Integer>> consumedPartitions = new ConcurrentHashMap<>();
private void trackConsumedPartitions(String key, ConsumerRecord<?, ?> record) {
consumedPartitions.computeIfAbsent(key, k -> new HashSet<>());
consumedPartitions.computeIfPresent(key, (k, v) -> {
v.add(record.partition());
return v;
});
}
4. 当消费者离开时可视化分区重平衡
通过集成测试观察消费者离开时的重平衡过程:
@SpringBootTest(classes = ManagingConsumerGroupsApplicationKafkaApp.class)
@EmbeddedKafka(partitions = 2, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
public class ManagingConsumerGroupsIntegrationTest {
private static final String CONSUMER_1_IDENTIFIER = "org.springframework.kafka.KafkaListenerEndpointContainer#1";
private static final int TOTAL_PRODUCED_MESSAGES = 50000;
private static final int MESSAGE_WHERE_CONSUMER_1_LEAVES_GROUP = 10000;
@Autowired
KafkaTemplate<String, Double> kafkaTemplate;
@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
MessageConsumerService consumerService;
}
测试方法实现:
@Test
public void givenContinuousMessageFlow_whenAConsumerLeavesTheGroup_thenKafkaTriggersPartitionRebalance() throws InterruptedException {
int currentMessage = 0;
do {
kafkaTemplate.send("topic-1", RandomUtils.nextDouble(10.0, 20.0));
Thread.sleep(0,100);
currentMessage++;
if (currentMessage == MESSAGE_WHERE_CONSUMER_1_LEAVES_GROUP) {
String containerId = kafkaListenerEndpointRegistry.getListenerContainerIds()
.stream()
.filter(a -> a.equals(CONSUMER_1_IDENTIFIER))
.findFirst()
.orElse("");
MessageListenerContainer container = kafkaListenerEndpointRegistry.getListenerContainer(containerId);
Objects.requireNonNull(container).stop();
kafkaListenerEndpointRegistry.unregisterListenerContainer(containerId);
if(currentMessage % 1000 == 0){
log.info("Processed {} of {}", currentMessage, TOTAL_PRODUCED_MESSAGES);
}
}
} while (currentMessage != TOTAL_PRODUCED_MESSAGES);
assertEquals(1, consumerService.consumedPartitions.get("consumer-1").size());
assertEquals(2, consumerService.consumedPartitions.get("consumer-0").size());
}
关键日志输出:
INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-group-1-1, groupId=group-1] Member consumer-group-1-1-4eb63bbc-336d-44d6-9d41-0a862029ba95 sending LeaveGroup request to coordinator localhost:9092
INFO k.coordinator.group.GroupCoordinator - [GroupCoordinator 0]: Preparing to rebalance group group-1 in state PreparingRebalance with old generation 2 (__consumer_offsets-4) (reason: Removing member consumer-group-1-1-4eb63bbc-336d-44d6-9d41-0a862029ba95 on LeaveGroup)
测试验证:当consumer-1
离开后,其分区被重新分配给consumer-0
,后者最终消费了两个分区的数据。
5. 有用的消费者配置
5.1. 会话超时和心跳频率
session.timeout.ms
:协调器等待心跳的最大时间(默认45秒)heartbeat.interval.ms
:心跳发送频率
⚠️ 关键原则:heartbeat.interval.ms
必须小于session.timeout.ms
(通常设为后者的33%)
配置权衡:
- ✅ 降低超时时间:加快故障恢复速度,提升可用性
- ❌ 过低超时时间:在Kafka 0.10.1.0前可能因消息处理阻塞导致误判(已通过KIP-62修复)
- ❌ 过高超时时间:故障检测延迟,但可避免旧版本误判问题
5.2. 最大轮询间隔时间
max.poll.interval.ms
指定消费者空闲的最大时间(默认5分钟)。超时后消费者停止发送心跳,直到会话超时离开组。
配置影响:
- ✅ 增加间隔时间:减少低吞吐量环境下的不必要重平衡
- ❌ 过大间隔时间:增加空闲消费者数量,提高基础设施成本
6. 结论
本文深入探讨了Kafka消费者组的核心机制:
- 组协调器和领导者的角色分工
- 已提交偏移量的管理方式
- 分区重平衡的触发条件与影响
通过实践演示了消费者离开时的自动重平衡过程,并分析了关键配置参数的权衡。理解这些机制对优化Kafka应用性能至关重要,特别是在高可用性和资源效率之间取得平衡时。
本文源码可在GitHub获取。