1. 概述
Kafka 是一个广泛使用的开源分布式消息流中间件。它通过 发布-订阅模式 解耦消息的生产者和消费者。
Kafka 有大量配置参数,其中 log.segment.bytes 和 log.retention.hours 与 Kafka 中消息的保留机制密切相关。
本文将深入探讨这两个参数的区别,并通过实际操作演示它们的作用。文中使用的是 Kafka 3.7.0 版本。
2. log.segment.bytes 与 log.retention.hours 的含义
Kafka 将消息存储在 分区 中。一个 Topic 可以有多个分区,相同 key 的消息会被分配到同一个分区中。若消息没有 key,则会被随机分配到某个分区。
每个分区由多个段(Segment)组成。每个段包含不同的 offset 范围。Kafka 按顺序将消息写入段中。每个分区只有一个“活跃段”用于接收新消息,活跃段是该分区最新的那个段。
log.segment.bytes
- 作用:指定单个日志段的最大大小(单位为字节)
- 默认值:1 GB(1073741824 bytes)
- 行为:当当前段达到该大小时,Kafka 会关闭该段并创建一个新的段作为活跃段
- 文件结构:每个段在文件系统中对应一个 log 文件
log.retention.hours 与 log.retention.ms
- 作用:指定 Kafka 保留旧段的时间长度。旧段关闭后不会立即删除,而是等待指定时间后才会被清理
- 默认值:log.retention.hours 默认为 168 小时(即 7 天),log.retention.ms 默认未设置
- 优先级:如果同时设置了这两个参数,log.retention.ms 优先级更高
3. 修改参数的影响
通常情况下,我们无需修改这些参数。但在特定场景下进行调整时,需注意以下影响:
log.segment.bytes 设置过小
- 段数量增加:每个分区会生成更多段
- 文件句柄压力增大:Kafka 需要同时处理更多打开的文件
- 可能报错:如 Too many open files
log.retention.hours 设置过高
- 磁盘空间占用增加:保留时间越长,数据越多
- 数据可恢复窗口变大:对需要回溯历史数据的消费者有利
log.retention.hours 设置过低
- 数据保留时间短:可能导致晚加入的消费者无法读取历史数据
- 磁盘压力小:适合对数据时效性要求高的场景
4. 实操示例
我们通过实际操作验证这两个参数的行为。
4.1 启动 Kafka 服务
使用 Kafka Raft 模式启动服务:
# 生成 cluster id
$ kafka-storage.sh random-uuid
VkjEOXEjTieLimFLjhvRKA
# 格式化存储目录
$ kafka-storage.sh format -t VkjEOXEjTieLimFLjhvRKA -c /home/baeldung/work/kafka/config/kraft/server.properties
# 启动 Kafka
$ kafka-server-start.sh /home/baeldung/work/kafka/config/kraft/server.properties
服务启动后,默认监听 localhost:9092
。
4.2 动态修改 log.segment.bytes
使用 kafka-configs.sh
动态修改 broker 配置:
# 查看当前配置
$ kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --describe
# 修改 log.segment.bytes 为 128 字节
$ kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --alter --add-config log.segment.bytes=128
此时所有 topic 的段大小将被限制为 128 字节。
4.3 创建 Topic
$ kafka-topics.sh --bootstrap-server localhost:9092 --topic my-topic --create
Topic 创建后,在 /tmp/kraft-combined-logs/my-topic-0/
目录下生成对应的日志文件:
00000000000000000000.index
00000000000000000000.log
00000000000000000000.timeindex
...
初始状态下 .log
文件大小为 0。
4.4 写入消息
启动生产者并发送两条消息:
$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic
>Message1
>Message2
写入消息后观察日志文件变化:
- 第一条消息写入
00000000000000000000.log
,大小变为 76 字节 - 第二条消息触发段切换,写入
00000000000000000001.log
,大小也为 76 字节
✅ 结论:由于段大小限制为 128 字节,第二条消息导致段满,触发新段创建。
4.5 消费消息
启动消费者读取所有消息:
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning
Message1
Message2
✅ 成功读取:说明消息在段中正常保留。
4.6 修改 log.retention.ms
将保留时间设为 5 秒:
$ kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --alter --add-config log.retention.ms=5000
再次读取消息:
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning
❌ 未读取到消息:说明旧段已被删除。
查看日志目录:
00000000000000000002.log
00000000000000000002.snapshot
00000000000000000002.timeindex
...
✅ 结论:旧段已被删除,新段已创建,但尚未写入消息。
5. 总结
本文详细解析了 Kafka 中两个重要参数:
参数名 | 作用 | 默认值 | 注意事项 |
---|---|---|---|
log.segment.bytes | 控制段大小 | 1 GB | 过小会导致频繁段切换和文件句柄压力 |
log.retention.hours | 控制段保留时间 | 168 小时 | 与 log.retention.ms 冲突时后者优先 |
log.retention.ms | 同上,单位为毫秒(更精细) | 未设置 | 更适合测试场景 |
通过实际操作验证了:
- 段切换行为:当日志段满时会创建新段
- 消息保留机制:关闭的段在保留时间后会被删除
在生产环境中调整这些参数前,务必评估其对性能和磁盘空间的影响。