1. 概述
本文将介绍 Apache BookKeeper,一个实现分布式、高容错日志存储系统的服务。它被设计用于需要高吞吐、低延迟和强一致性的场景,比如分布式日志、消息队列和流式处理系统。
2. 什么是 BookKeeper?
BookKeeper 最初由 Yahoo 开发,作为 ZooKeeper 的子项目,2015 年成为 Apache 顶级项目。其核心目标是提供一个可靠且高性能的系统,用于存储称为 Log Entries(也叫 Records)的有序数据序列,这些数据被组织在名为 Ledgers 的数据结构中。
✅ Ledger 的关键特性是:仅支持追加(append-only)且不可变(immutable)。这使得 BookKeeper 非常适合以下场景:
- 分布式日志系统(如 Apache DistributedLog)
- 发布-订阅(Pub-Sub)消息系统
- 实时流处理(Streaming)
⚠️ 正因为不可变,你不能把它当普通文件或数据库表来“更新”某条记录——这是踩坑高发区。
3. 核心概念
3.1. 日志条目(Log Entries)
日志条目是客户端写入或读取 BookKeeper 的最小数据单元。每个条目除了包含原始数据外,BookKeeper 还会自动附加一些元数据:
entryId
:在同一个 Ledger 内唯一- 认证码(Authentication Code):用于检测条目是否损坏或被篡改
⚠️ BookKeeper 不提供序列化功能。客户端需要自行将对象序列化为 byte[]
,反序列化也一样。别指望它帮你处理 POJO,这是你的责任。
3.2. Ledger(账本)
Ledger 是 BookKeeper 的基本存储单元,用来存储有序的日志条目序列。
✅ 关键行为:
- ✅ 仅支持追加写入,不能修改或删除已有条目
- ✅ 客户端关闭 Ledger 后,BookKeeper 会将其“封存”(seal),之后再也无法写入,哪怕重启也不行
⚠️ 设计建议:
不要直接用 Ledger 实现队列、KV 存储等高级结构。它更像“日志分段”(log segment)。比如 Apache DistributedLog 就用多个 Ledger 组合成一个逻辑日志,对用户透明。
高可用与副本机制
BookKeeper 通过在多个 Bookie 上复制条目来保证高可用。三个关键参数控制副本策略:
参数 | 说明 |
---|---|
Ensemble Size | 参与写入的 Bookie 总数 |
Write Quorum Size | 单个条目必须写入成功的 Bookie 数量 |
Ack Quorum Size | 必须返回确认(ack)的 Bookie 数量 |
🌰 举例:
设置为 3, 2, 2
表示:3 个 Bookie 参与,条目需写入至少 2 个,且至少 2 个确认才算成功。这样可容忍 1 个节点故障。
自定义元数据
创建 Ledger 时可传入 Map<String, byte[]>
类型的自定义元数据,BookKeeper 会将其与内部元数据一起存入 ZooKeeper。
3.3. Bookie(存储节点)
Bookie 是实际存储 Ledger 数据的服务器进程。一个 BookKeeper 集群由多个 Bookie 组成,通过 TCP 或 TLS 对外提供服务。
📌 依赖 ZooKeeper:
Bookie 依赖 ZooKeeper 进行集群协调和元数据管理。
✅ 最小高可用部署建议:
- 至少 3 个 ZooKeeper 节点
- 至少 3 个 Bookie 节点
这样在 ensemble=3, writeQuorum=2, ackQuorum=2
的配置下,可容忍任意单点故障。
4. 本地环境搭建
本地测试只需一个 ZooKeeper 和至少一个 Bookie。虽然可以手动部署,但推荐使用 docker-compose
快速启动:
$ cd <path to docker-compose.yml>
$ docker-compose up
该配置会启动 1 个 ZooKeeper + 3 个 Bookie(均在同一 Docker 主机)。仅用于测试,生产环境需跨物理机部署。
启动后,用 BookKeeper 自带命令检查 Bookie 状态:
$ docker exec -it apache-bookkeeper_bookie_1 /opt/bookkeeper/bin/bookkeeper \
shell listbookies -readwrite
正常输出示例:
ReadWrite Bookies :
192.168.99.101(192.168.99.101):4181
192.168.99.101(192.168.99.101):4182
192.168.99.101(192.168.99.101):3181
IP 地址会因 Docker 网络配置而异,只要列出 3 个 Bookie 即表示集群正常。
5. 使用 Ledger API
Ledger API 是操作 BookKeeper 的底层接口,直接面向 Ledger 对象。它不提供流(Stream)等高级抽象——这类需求建议使用 DistributedLog。
首先引入依赖:
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-server</artifactId>
<version>4.10.0</version>
</dependency>
⚠️ 注意:该依赖会引入 Protobuf 和 Guava。如果你项目中版本冲突,建议使用 shaded 版本:
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-server-shaded</artifactId>
<version>4.10.0</version>
</dependency>
5.1. 连接 BookKeeper
BookKeeper
类是 Ledger API 的入口。
简单连接
BookKeeper client = new BookKeeper("zookeeper-host:2131");
zookeeper-host
替换为你的 ZooKeeper 地址,本地测试通常是 localhost
或 Docker 主机 IP。
自定义配置
如需调整参数(如超时、重试等),使用 ClientConfiguration
:
ClientConfiguration cfg = new ClientConfiguration();
cfg.setMetadataServiceUri("zk+null://zookeeper-host:2131");
// ... 其他配置项
BookKeeper bk = BookKeeper.forConfig(cfg).build();
5.2. 创建 Ledger
创建 Ledger 有同步、异步和 Fluent API 三种方式。
同步创建(最简单)
LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.MAC, "password".getBytes());
使用默认配置(ensemble=3, writeQuorum=2, ackQuorum=2),MAC 摘要类型用于校验完整性。
带自定义参数和元数据
LedgerHandle lh = bk.createLedger(
3, // ensemble size
2, // write quorum
2, // ack quorum
DigestType.MAC,
"password".getBytes(),
Collections.singletonMap("name", "my-ledger".getBytes())
);
异步创建(Callback 风格)
bk.asyncCreateLedger(
3, 2, 2,
BookKeeper.DigestType.MAC, "passwd".getBytes(),
(rc, lh, ctx) -> {
if (rc == BKException.Code.OK) {
// 使用 lh 操作 Ledger
}
},
null,
Collections.emptyMap());
Fluent API + CompletableFuture(推荐)
CompletableFuture<WriteHandle> cf = bk.newCreateLedgerOp()
.withEnsembleSize(3)
.withWriteQuorumSize(2)
.withAckQuorumSize(2)
.withDigestType(org.apache.bookkeeper.client.api.DigestType.MAC)
.withPassword("password".getBytes())
.withCustomMetadata(Collections.singletonMap("app", "demo".getBytes()))
.execute();
📌 注意:Fluent API 返回的是 WriteHandle
,但它和 LedgerHandle
都实现了写操作接口,可互换使用。
5.3. 写入数据
获取 LedgerHandle
或 WriteHandle
后,使用 append
方法写入。
同步写入
for(int i = 0; i < MAX_MESSAGES; i++) {
byte[] data = ("message-" + i).getBytes();
lh.append(data);
}
API 也支持 ByteBuf
(Netty)和 ByteBuffer
(NIO),利于高性能场景的内存管理。
异步写入
// CompletableFuture 风格(推荐)
CompletableFuture<Long> future = lh.appendAsync(data);
future.thenAccept(entryId -> {
// 写入成功,entryId 是该条目的 ID
});
// Callback 风格(仅 LedgerHandle 支持)
lh.asyncAddEntry(
data,
(rc, ledgerHandle, entryId, ctx) -> {
if (rc == BKException.Code.OK) {
// 成功
}
},
null);
✅ 建议优先使用 CompletableFuture
,代码更清晰,且易于与 Reactor(如 Mono.fromFuture()
)集成。
5.4. 读取数据
读取需先打开 Ledger。
同步打开
LedgerHandle lh = bk.openLedger(
ledgerId,
BookKeeper.DigestType.MAC,
"password".getBytes());
⚠️ openLedger()
返回的是只读句柄,调用 append()
会抛异常。
Fluent API 打开(推荐)
ReadHandle rh = bk.newOpenLedgerOp()
.withLedgerId(ledgerId)
.withDigestType(DigestType.MAC)
.withPassword("password".getBytes())
.execute()
.get(); // 注意:.get() 是阻塞的
读取条目
long lastId = rh.readLastConfirmed(); // 获取最后确认的条目 ID
Enumeration<LedgerEntry> entries = rh.read(0, lastId); // 同步读取
while (entries.hasMoreElements()) {
LedgerEntry entry = entries.nextElement();
byte[] data = entry.getEntry();
// 处理数据
}
异步读取
rh.readAsync(0, lastId).thenAccept((entries) -> {
entries.forEachRemaining((entry) -> {
// 处理 entry
});
});
传统异步方式(Callback)
lh.asyncReadEntries(
0, lastId,
(rc, lh, entries, ctx) -> {
if (rc == BKException.Code.OK) {
while(entries.hasMoreElements()) {
LedgerEntry e = entries.nextElement();
// 处理
}
}
},
null);
5.5. 列出所有 Ledger
要读取 Ledger,必须知道其 ledgerId
。可以通过 LedgerManager
获取所有 Ledger ID。
public List<Long> listAllLedgers(BookKeeper bk) {
List<Long> ledgers = Collections.synchronizedList(new ArrayList<>());
CountDownLatch processDone = new CountDownLatch(1);
bk.getLedgerManager().asyncProcessLedgers(
(ledgerId, cb) -> {
ledgers.add(ledgerId);
cb.processResult(BKException.Code.OK, null, null); // 必须调用
},
(rc, s, obj) -> {
processDone.countDown(); // 所有处理完成
},
null,
BKException.Code.OK,
BKException.Code.ReadException);
try {
processDone.await(1, TimeUnit.MINUTES);
return ledgers;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while listing ledgers", ie);
}
}
⚠️ 关键点:
- 第一个回调处理每个 Ledger ID,必须调用
cb.processResult()
通知框架继续 - 第二个回调在所有 Ledger 处理完或出错时调用
- 使用
synchronizedList
防止多线程并发问题
6. 总结
本文介绍了 Apache BookKeeper 的核心概念,并通过 Ledger API 演示了如何创建、写入、读取和管理 Ledger。
📌 核心要点回顾:
- Ledger 是 append-only、immutable 的日志单元
- 高可用依赖 ensemble/write/ack quorum 配置
- 生产环境务必部署多节点 Bookie + ZooKeeper
- 优先使用 Fluent API 和
CompletableFuture
- 不要直接用 Ledger 实现复杂数据结构
文中所有代码示例均可在 GitHub 获取:
https://github.com/eugenp/tutorials/tree/master/persistence-modules/apache-bookkeeper