1. 引言

消费者组通过允许多个消费者从同一主题读取数据,帮助构建更具扩展性的Kafka应用。本教程将深入理解消费者组及其在消费者间重平衡分区的机制。

2. 什么是消费者组?

消费者组是与一个或多个主题关联的一组唯一消费者。每个消费者可读取零个、一个或多个分区,但每个分区在特定时间点只能分配给一个消费者。当组成员变化时,分区分配会随之改变,这个过程称为组重平衡

消费者组是Kafka应用的核心组件,它通过将相似消费者分组,实现从分区主题的并行读取,从而提升应用性能和可扩展性。

2.1. 组协调器和组领导者

创建消费者组时,Kafka会同时创建组协调器。协调器定期接收来自消费者的心跳请求。若消费者停止发送心跳,协调器会认为该消费者已离开组或崩溃,这会触发分区重平衡。

第一个请求加入组的消费者成为组领导者。当重平衡发生时,领导者从协调器获取成员列表,然后根据partition.assignment.strategy配置的自定义策略重新分配分区。

2.2. 已提交偏移量

Kafka使用已提交偏移量跟踪主题的最后读取位置。该偏移量表示消费者成功处理消息的位置,也是后续读取的起点。

所有分区的已提交偏移量存储在名为__consumer_offsets的内部主题中。由于主题具有持久性和容错性,我们可以信任其存储的信息。

2.3. 分区重平衡

分区重平衡会改变分区的所有权。当新消费者加入组、现有消费者崩溃或主动退出时,Kafka会自动执行重平衡:

  • 新消费者加入:Kafka公平地将其他消费者的部分分区分配给新成员
  • ⚠️ 消费者崩溃:其分区必须重新分配给组内剩余消费者,避免消息丢失

重平衡期间,消费者无法消费消息,导致服务短暂不可用。同时,消费者会丢失状态并需重新计算缓存值,这会降低消息处理速度。

3. 应用程序设置

本节将配置Spring Kafka应用的基础组件。

3.1. 创建基本配置

首先配置主题及其分区:

@Configuration
public class KafkaTopicConfiguration {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    public NewTopic celciusTopic() {
        return TopicBuilder.name("topic-1")
            .partitions(2)
            .build();
    }
}

接着配置生产者:

@Configuration
public class KafkaProducerConfiguration {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, Double> kafkaProducer() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DoubleSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Double> kafkaProducerTemplate() {
        return new KafkaTemplate<>(kafkaProducer());
    }
}

最后配置消费者:

@Configuration
public class KafkaConsumerConfiguration {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    public ConsumerFactory<String, Double> kafkaConsumer() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DoubleDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Double> kafkaConsumerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Double> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumer());
        return factory;
    }
}

3.2. 设置消费者

创建两个属于group-1的消费者:

@Service
public class MessageConsumerService {
    @KafkaListener(topics = "topic-1", groupId = "group-1")
    public void consumer0(ConsumerRecord<?, ?> consumerRecord) {
        trackConsumedPartitions("consumer-0", consumerRecord);
    }

    @KafkaListener(topics = "topic-1", groupId = "group-1")
    public void consumer1(ConsumerRecord<?, ?> consumerRecord) {
        trackConsumedPartitions("consumer-1", consumerRecord);
    }
}

添加分区跟踪逻辑:

Map<String, Set<Integer>> consumedPartitions = new ConcurrentHashMap<>();

private void trackConsumedPartitions(String key, ConsumerRecord<?, ?> record) {
    consumedPartitions.computeIfAbsent(key, k -> new HashSet<>());
    consumedPartitions.computeIfPresent(key, (k, v) -> {
        v.add(record.partition());
        return v;
    });
}

4. 当消费者离开时可视化分区重平衡

通过集成测试观察消费者离开时的重平衡过程:

@SpringBootTest(classes = ManagingConsumerGroupsApplicationKafkaApp.class)
@EmbeddedKafka(partitions = 2, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
public class ManagingConsumerGroupsIntegrationTest {

    private static final String CONSUMER_1_IDENTIFIER = "org.springframework.kafka.KafkaListenerEndpointContainer#1";
    private static final int TOTAL_PRODUCED_MESSAGES = 50000;
    private static final int MESSAGE_WHERE_CONSUMER_1_LEAVES_GROUP = 10000;

    @Autowired
    KafkaTemplate<String, Double> kafkaTemplate;

    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Autowired
    MessageConsumerService consumerService;
}

测试方法实现:

@Test
public void givenContinuousMessageFlow_whenAConsumerLeavesTheGroup_thenKafkaTriggersPartitionRebalance() throws InterruptedException {
    int currentMessage = 0;

    do {
        kafkaTemplate.send("topic-1", RandomUtils.nextDouble(10.0, 20.0));
        Thread.sleep(0,100);
        currentMessage++;

        if (currentMessage == MESSAGE_WHERE_CONSUMER_1_LEAVES_GROUP) {
            String containerId = kafkaListenerEndpointRegistry.getListenerContainerIds()
                .stream()
                .filter(a -> a.equals(CONSUMER_1_IDENTIFIER))
                .findFirst()
                .orElse("");
            MessageListenerContainer container = kafkaListenerEndpointRegistry.getListenerContainer(containerId);
            Objects.requireNonNull(container).stop();
            kafkaListenerEndpointRegistry.unregisterListenerContainer(containerId);
            if(currentMessage % 1000 == 0){
                log.info("Processed {} of {}", currentMessage, TOTAL_PRODUCED_MESSAGES);
            }
        }
    } while (currentMessage != TOTAL_PRODUCED_MESSAGES);

    assertEquals(1, consumerService.consumedPartitions.get("consumer-1").size());
    assertEquals(2, consumerService.consumedPartitions.get("consumer-0").size());
}

关键日志输出:

INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-group-1-1, groupId=group-1] Member consumer-group-1-1-4eb63bbc-336d-44d6-9d41-0a862029ba95 sending LeaveGroup request to coordinator localhost:9092
INFO  k.coordinator.group.GroupCoordinator - [GroupCoordinator 0]: Preparing to rebalance group group-1 in state PreparingRebalance with old generation 2 (__consumer_offsets-4) (reason: Removing member consumer-group-1-1-4eb63bbc-336d-44d6-9d41-0a862029ba95 on LeaveGroup)

测试验证:当consumer-1离开后,其分区被重新分配给consumer-0,后者最终消费了两个分区的数据。

5. 有用的消费者配置

5.1. 会话超时和心跳频率

  • session.timeout.ms:协调器等待心跳的最大时间(默认45秒)
  • heartbeat.interval.ms:心跳发送频率

⚠️ 关键原则heartbeat.interval.ms必须小于session.timeout.ms(通常设为后者的33%)

配置权衡

  • 降低超时时间:加快故障恢复速度,提升可用性
  • 过低超时时间:在Kafka 0.10.1.0前可能因消息处理阻塞导致误判(已通过KIP-62修复)
  • 过高超时时间:故障检测延迟,但可避免旧版本误判问题

5.2. 最大轮询间隔时间

max.poll.interval.ms指定消费者空闲的最大时间(默认5分钟)。超时后消费者停止发送心跳,直到会话超时离开组。

配置影响

  • 增加间隔时间:减少低吞吐量环境下的不必要重平衡
  • 过大间隔时间:增加空闲消费者数量,提高基础设施成本

6. 结论

本文深入探讨了Kafka消费者组的核心机制:

  • 组协调器和领导者的角色分工
  • 已提交偏移量的管理方式
  • 分区重平衡的触发条件与影响

通过实践演示了消费者离开时的自动重平衡过程,并分析了关键配置参数的权衡。理解这些机制对优化Kafka应用性能至关重要,特别是在高可用性和资源效率之间取得平衡时。

本文源码可在GitHub获取。


原始标题:Manage Kafka Consumer Groups