1. 概述
Kafka消费者偏移量(offset)是一个唯一且单调递增的整数,用于标识事件记录在分区中的位置。消费者组中的每个消费者都会为每个分区维护特定的偏移量,以跟踪消费进度。另一方面,Kafka消费者组由多个消费者组成,它们通过轮询机制负责从主题的多个分区读取消息。
Kafka中的组协调器(group coordinator)负责管理消费者组,并将分区分配给组内的消费者。当消费者启动时,它会定位所在组的协调器并请求加入。协调器会触发再平衡(rebalance),为新成员分配其应处理的分区。
本文将探讨这些偏移量的存储位置,以及消费者如何利用它们来跟踪、启动或恢复消费进度。
2. 环境搭建
首先使用Docker Compose脚本在Kraft模式下搭建单实例Kafka集群:
broker:
image: confluentinc/cp-kafka:7.7.0
hostname: broker
container_name: broker
ports:
- "9092:9092"
- "9101:9101"
expose:
- '29092'
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
KAFKA_LOG_CLEANUP_POLICY: 'delete'
集群将在 http://localhost:9092/ 可用。
接下来创建包含两个分区的主题:
init-kafka:
image: confluentinc/cp-kafka:7.7.0
depends_on:
- broker
entrypoint: [ '/bin/sh', '-c' ]
command: |
" # blocks until kafka is reachable
kafka-topics --bootstrap-server broker:29092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server broker:29092 --create \
--if-not-exists --topic user-data --partitions 2 "
可选步骤:配置Kafka UI方便查看消息(本文主要使用CLI操作):
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "3030:8080"
depends_on:
- broker
- init-kafka
environment:
KAFKA_CLUSTERS_0_NAME: broker
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: broker:29092
Kafka UI将在 http://localhost:3030/ 可用:
3. 基于配置的偏移量参考
*当消费者首次加入组时,会根据 auto.offset.reset 配置(设置为 earliest 或 latest)确定偏移量位置来获取记录。*
先作为生产者发送几条消息:
docker exec -i kafka_broker kafka-console-producer \
--broker-list localhost:9092 \
--topic user-data <<< '{"id": 1, "first_name": "John", "last_name": "Doe"}
{"id": 2, "first_name": "Alice", "last_name": "Johnson"}'
然后注册消费者读取消息,设置 auto.offset.reset=earliest:
docker exec -it kafka_broker kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic user-data \
--consumer-property auto.offset.reset=earliest \
--group consumer-user-data
这会向 consumer-user-data 组添加新消费者。可在Broker日志和Kafka UI中观察到再平衡过程,消费者会根据 earliest 策略列出所有消息。
⚠️ 注意:消费者会保持终端打开持续消费消息。为测试中断后的行为,需终止此会话。
4. 基于主题的偏移量参考
当消费者加入组时,Broker会创建内部主题 __consumer_offsets 存储消费者偏移量状态(按主题和分区级别)。 如果启用了Kafka的自动提交(auto-commit),消费者会定期将最后处理的消息偏移量提交到此主题,便于中断后恢复消费。
当组内消费者因崩溃或断开连接而失效时,Kafka会检测到心跳丢失并触发再平衡,将失效消费者的分区重新分配给活跃消费者,确保消息持续消费。内部主题中的持久化状态用于恢复消费。
首先验证内部主题中的已提交偏移量状态:
docker exec -it kafka_broker kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic __consumer_offsets \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
--from-beginning
此脚本使用特定格式提高可读性(默认为二进制格式),输出显示消费者组(consumer-user-data)、主题(user-data)、分区(0和1)及偏移量元数据(offset=2):
[consumer-user-data,user-data,0]::OffsetAndMetadata(offset=2, leaderEpoch=Optional[0], metadata=, commitTimestamp=1726601656308, expireTimestamp=None)
[consumer-user-data,user-data,1]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1726601661314, expireTimestamp=None)
✅ 分区 0 已接收所有消息,消费者提交了状态用于跟踪进度/恢复。
接下来验证恢复行为:作为生产者发送额外消息:
docker exec -i kafka_broker kafka-console-producer \
--broker-list localhost:9092 \
--topic user-data <<< '{"id": 3, "first_name": "Alice", "last_name": "Johnson"}
{"id": 4, "first_name": "Michael", "last_name": "Brown"}'
然后重启之前终止的消费者,检查是否从最后已知偏移量恢复消费:
docker exec -it kafka_broker kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic user-data \
--consumer-property auto.offset.reset=earliest \
--group consumer-user-data
即使设置了 auto.offset.reset=earliest,消费者仍会输出用户ID 3 和 4 的记录,因为偏移量状态存储在内部主题中。最后再次运行命令验证 __consumer_offsets 主题状态:
[consumer-user-data, user-data, 1] :: OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1726611172398, expireTimestamp=None)
[consumer-user-data, user-data, 0] :: OffsetAndMetadata(offset=4, leaderEpoch=Optional[0], metadata=, commitTimestamp=1726611172398, expireTimestamp=None)
✅ __consumer_offsets 主题已更新提交的偏移量(值为 4),由于状态保留在主题中,消费者从最后提交的偏移量恢复消费。
5. 总结
本文探讨了Kafka如何管理消费者偏移量,以及消费者首次加入组时 auto.offset.reset 属性的工作机制。我们还学习了内部主题 __consumer_offsets 的状态如何用于暂停或中断后恢复消费。
关键要点:
- 偏移量是分区中消息位置的唯一标识符
- 消费者组通过再平衡分配分区
- 首次消费时通过 auto.offset.reset 确定起始位置
- 内部主题 __consumer_offsets 持久化存储偏移量状态
- 自动提交机制确保消费进度可恢复
踩坑提醒:生产环境中需谨慎配置自动提交策略,避免消息丢失或重复消费!