1. 概述

Kafka消费者偏移量(offset)是一个唯一且单调递增的整数,用于标识事件记录在分区中的位置。消费者组中的每个消费者都会为每个分区维护特定的偏移量,以跟踪消费进度。另一方面,Kafka消费者组由多个消费者组成,它们通过轮询机制负责从主题的多个分区读取消息。

Kafka中的组协调器(group coordinator)负责管理消费者组,并将分区分配给组内的消费者。当消费者启动时,它会定位所在组的协调器并请求加入。协调器会触发再平衡(rebalance),为新成员分配其应处理的分区。

本文将探讨这些偏移量的存储位置,以及消费者如何利用它们来跟踪、启动或恢复消费进度。

2. 环境搭建

首先使用Docker Compose脚本在Kraft模式下搭建单实例Kafka集群

broker:
  image: confluentinc/cp-kafka:7.7.0
  hostname: broker
  container_name: broker
  ports:
    - "9092:9092"
    - "9101:9101"
  expose:
    - '29092'
  environment:
    KAFKA_NODE_ID: 1
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
    KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
    KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
    KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    KAFKA_JMX_PORT: 9101
    KAFKA_JMX_HOSTNAME: localhost
    KAFKA_PROCESS_ROLES: 'broker,controller'
    KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
    KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
    KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
    KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
    KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
    CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
    KAFKA_LOG_CLEANUP_POLICY: 'delete'

集群将在 http://localhost:9092/ 可用。

接下来创建包含两个分区的主题:

init-kafka:
  image: confluentinc/cp-kafka:7.7.0
  depends_on:
    - broker
  entrypoint: [ '/bin/sh', '-c' ]
  command: |
    " # blocks until kafka is reachable
    kafka-topics --bootstrap-server broker:29092 --list
    echo -e 'Creating kafka topics'
    kafka-topics --bootstrap-server broker:29092 --create \
      --if-not-exists --topic user-data --partitions 2 "

可选步骤:配置Kafka UI方便查看消息(本文主要使用CLI操作):

kafka-ui:
  image: provectuslabs/kafka-ui:latest
  ports:
    - "3030:8080"
  depends_on:
    - broker
    - init-kafka
  environment:
    KAFKA_CLUSTERS_0_NAME: broker
    KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: broker:29092

Kafka UI将在 http://localhost:3030/ 可用:

kafka ui

3. 基于配置的偏移量参考

*当消费者首次加入组时,会根据 auto.offset.reset 配置(设置为 earliestlatest)确定偏移量位置来获取记录。*

先作为生产者发送几条消息:

docker exec -i kafka_broker kafka-console-producer \
  --broker-list localhost:9092 \
  --topic user-data <<< '{"id": 1, "first_name": "John", "last_name": "Doe"}
{"id": 2, "first_name": "Alice", "last_name": "Johnson"}'

然后注册消费者读取消息,设置 auto.offset.reset=earliest

docker exec -it kafka_broker kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic user-data \
  --consumer-property auto.offset.reset=earliest \
  --group consumer-user-data

这会向 consumer-user-data 组添加新消费者。可在Broker日志和Kafka UI中观察到再平衡过程,消费者会根据 earliest 策略列出所有消息。

⚠️ 注意:消费者会保持终端打开持续消费消息。为测试中断后的行为,需终止此会话。

4. 基于主题的偏移量参考

当消费者加入组时,Broker会创建内部主题 __consumer_offsets 存储消费者偏移量状态(按主题和分区级别)。 如果启用了Kafka的自动提交(auto-commit),消费者会定期将最后处理的消息偏移量提交到此主题,便于中断后恢复消费。

当组内消费者因崩溃或断开连接而失效时,Kafka会检测到心跳丢失并触发再平衡,将失效消费者的分区重新分配给活跃消费者,确保消息持续消费。内部主题中的持久化状态用于恢复消费。

首先验证内部主题中的已提交偏移量状态:

docker exec -it kafka_broker kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic __consumer_offsets \
  --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
  --from-beginning

此脚本使用特定格式提高可读性(默认为二进制格式),输出显示消费者组(consumer-user-data)、主题(user-data)、分区(01)及偏移量元数据(offset=2):

[consumer-user-data,user-data,0]::OffsetAndMetadata(offset=2, leaderEpoch=Optional[0], metadata=, commitTimestamp=1726601656308, expireTimestamp=None)
[consumer-user-data,user-data,1]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1726601661314, expireTimestamp=None)

✅ 分区 0 已接收所有消息,消费者提交了状态用于跟踪进度/恢复。

接下来验证恢复行为:作为生产者发送额外消息:

docker exec -i kafka_broker kafka-console-producer \
  --broker-list localhost:9092 \
  --topic user-data <<< '{"id": 3, "first_name": "Alice", "last_name": "Johnson"} 
{"id": 4, "first_name": "Michael", "last_name": "Brown"}'

然后重启之前终止的消费者,检查是否从最后已知偏移量恢复消费:

docker exec -it kafka_broker kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic user-data \
  --consumer-property auto.offset.reset=earliest \
  --group consumer-user-data

即使设置了 auto.offset.reset=earliest,消费者仍会输出用户ID 34 的记录,因为偏移量状态存储在内部主题中。最后再次运行命令验证 __consumer_offsets 主题状态:

[consumer-user-data, user-data, 1] :: OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1726611172398, expireTimestamp=None)
[consumer-user-data, user-data, 0] :: OffsetAndMetadata(offset=4, leaderEpoch=Optional[0], metadata=, commitTimestamp=1726611172398, expireTimestamp=None)

__consumer_offsets 主题已更新提交的偏移量(值为 4),由于状态保留在主题中,消费者从最后提交的偏移量恢复消费。

5. 总结

本文探讨了Kafka如何管理消费者偏移量,以及消费者首次加入组时 auto.offset.reset 属性的工作机制。我们还学习了内部主题 __consumer_offsets 的状态如何用于暂停或中断后恢复消费。

关键要点:

  • 偏移量是分区中消息位置的唯一标识符
  • 消费者组通过再平衡分配分区
  • 首次消费时通过 auto.offset.reset 确定起始位置
  • 内部主题 __consumer_offsets 持久化存储偏移量状态
  • 自动提交机制确保消费进度可恢复

踩坑提醒:生产环境中需谨慎配置自动提交策略,避免消息丢失或重复消费!


原始标题:Understanding Kafka Consumer Offset | Baeldung