1. 概述

本文将介绍 Apache BookKeeper,一个实现分布式、高容错日志存储系统的服务。它被设计用于需要高吞吐、低延迟和强一致性的场景,比如分布式日志、消息队列和流式处理系统。

2. 什么是 BookKeeper?

BookKeeper 最初由 Yahoo 开发,作为 ZooKeeper 的子项目,2015 年成为 Apache 顶级项目。其核心目标是提供一个可靠且高性能的系统,用于存储称为 Log Entries(也叫 Records)的有序数据序列,这些数据被组织在名为 Ledgers 的数据结构中。

Ledger 的关键特性是:仅支持追加(append-only)且不可变(immutable)。这使得 BookKeeper 非常适合以下场景:

  • 分布式日志系统(如 Apache DistributedLog)
  • 发布-订阅(Pub-Sub)消息系统
  • 实时流处理(Streaming)

⚠️ 正因为不可变,你不能把它当普通文件或数据库表来“更新”某条记录——这是踩坑高发区。

3. 核心概念

3.1. 日志条目(Log Entries)

日志条目是客户端写入或读取 BookKeeper 的最小数据单元。每个条目除了包含原始数据外,BookKeeper 还会自动附加一些元数据:

  • entryId:在同一个 Ledger 内唯一
  • 认证码(Authentication Code):用于检测条目是否损坏或被篡改

⚠️ BookKeeper 不提供序列化功能。客户端需要自行将对象序列化为 byte[],反序列化也一样。别指望它帮你处理 POJO,这是你的责任。

3.2. Ledger(账本)

Ledger 是 BookKeeper 的基本存储单元,用来存储有序的日志条目序列。

关键行为:

  • ✅ 仅支持追加写入,不能修改或删除已有条目
  • ✅ 客户端关闭 Ledger 后,BookKeeper 会将其“封存”(seal),之后再也无法写入,哪怕重启也不行

⚠️ 设计建议
不要直接用 Ledger 实现队列、KV 存储等高级结构。它更像“日志分段”(log segment)。比如 Apache DistributedLog 就用多个 Ledger 组合成一个逻辑日志,对用户透明。

高可用与副本机制

BookKeeper 通过在多个 Bookie 上复制条目来保证高可用。三个关键参数控制副本策略:

参数 说明
Ensemble Size 参与写入的 Bookie 总数
Write Quorum Size 单个条目必须写入成功的 Bookie 数量
Ack Quorum Size 必须返回确认(ack)的 Bookie 数量

🌰 举例:
设置为 3, 2, 2 表示:3 个 Bookie 参与,条目需写入至少 2 个,且至少 2 个确认才算成功。这样可容忍 1 个节点故障。

自定义元数据

创建 Ledger 时可传入 Map<String, byte[]> 类型的自定义元数据,BookKeeper 会将其与内部元数据一起存入 ZooKeeper。

3.3. Bookie(存储节点)

Bookie 是实际存储 Ledger 数据的服务器进程。一个 BookKeeper 集群由多个 Bookie 组成,通过 TCP 或 TLS 对外提供服务。

📌 依赖 ZooKeeper
Bookie 依赖 ZooKeeper 进行集群协调和元数据管理。

最小高可用部署建议

  • 至少 3 个 ZooKeeper 节点
  • 至少 3 个 Bookie 节点

这样在 ensemble=3, writeQuorum=2, ackQuorum=2 的配置下,可容忍任意单点故障。

4. 本地环境搭建

本地测试只需一个 ZooKeeper 和至少一个 Bookie。虽然可以手动部署,但推荐使用 docker-compose 快速启动:

$ cd <path to docker-compose.yml>
$ docker-compose up

该配置会启动 1 个 ZooKeeper + 3 个 Bookie(均在同一 Docker 主机)。仅用于测试,生产环境需跨物理机部署。

启动后,用 BookKeeper 自带命令检查 Bookie 状态:

$ docker exec -it apache-bookkeeper_bookie_1 /opt/bookkeeper/bin/bookkeeper \
  shell listbookies -readwrite

正常输出示例:

ReadWrite Bookies :
192.168.99.101(192.168.99.101):4181
192.168.99.101(192.168.99.101):4182
192.168.99.101(192.168.99.101):3181

IP 地址会因 Docker 网络配置而异,只要列出 3 个 Bookie 即表示集群正常。

5. 使用 Ledger API

Ledger API 是操作 BookKeeper 的底层接口,直接面向 Ledger 对象。它不提供流(Stream)等高级抽象——这类需求建议使用 DistributedLog

首先引入依赖:

<dependency>
    <groupId>org.apache.bookkeeper</groupId>
    <artifactId>bookkeeper-server</artifactId>
    <version>4.10.0</version>
</dependency>

⚠️ 注意:该依赖会引入 Protobuf 和 Guava。如果你项目中版本冲突,建议使用 shaded 版本:

<dependency>
    <groupId>org.apache.bookkeeper</groupId>
    <artifactId>bookkeeper-server-shaded</artifactId>
    <version>4.10.0</version>
</dependency>

5.1. 连接 BookKeeper

BookKeeper 类是 Ledger API 的入口。

简单连接

BookKeeper client = new BookKeeper("zookeeper-host:2131");

zookeeper-host 替换为你的 ZooKeeper 地址,本地测试通常是 localhost 或 Docker 主机 IP。

自定义配置

如需调整参数(如超时、重试等),使用 ClientConfiguration

ClientConfiguration cfg = new ClientConfiguration();
cfg.setMetadataServiceUri("zk+null://zookeeper-host:2131");
// ... 其他配置项

BookKeeper bk = BookKeeper.forConfig(cfg).build();

5.2. 创建 Ledger

创建 Ledger 有同步、异步和 Fluent API 三种方式。

同步创建(最简单)

LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.MAC, "password".getBytes());

使用默认配置(ensemble=3, writeQuorum=2, ackQuorum=2),MAC 摘要类型用于校验完整性。

带自定义参数和元数据

LedgerHandle lh = bk.createLedger(
  3,                                  // ensemble size
  2,                                  // write quorum
  2,                                  // ack quorum
  DigestType.MAC,
  "password".getBytes(),
  Collections.singletonMap("name", "my-ledger".getBytes())
);

异步创建(Callback 风格)

bk.asyncCreateLedger(
  3, 2, 2,
  BookKeeper.DigestType.MAC, "passwd".getBytes(),
  (rc, lh, ctx) -> {
      if (rc == BKException.Code.OK) {
          // 使用 lh 操作 Ledger
      }
  },
  null,
  Collections.emptyMap());

Fluent API + CompletableFuture(推荐)

CompletableFuture<WriteHandle> cf = bk.newCreateLedgerOp()
  .withEnsembleSize(3)
  .withWriteQuorumSize(2)
  .withAckQuorumSize(2)
  .withDigestType(org.apache.bookkeeper.client.api.DigestType.MAC)
  .withPassword("password".getBytes())
  .withCustomMetadata(Collections.singletonMap("app", "demo".getBytes()))
  .execute();

📌 注意:Fluent API 返回的是 WriteHandle,但它和 LedgerHandle 都实现了写操作接口,可互换使用。

5.3. 写入数据

获取 LedgerHandleWriteHandle 后,使用 append 方法写入。

同步写入

for(int i = 0; i < MAX_MESSAGES; i++) {
    byte[] data = ("message-" + i).getBytes();
    lh.append(data);
}

API 也支持 ByteBuf(Netty)和 ByteBuffer(NIO),利于高性能场景的内存管理。

异步写入

// CompletableFuture 风格(推荐)
CompletableFuture<Long> future = lh.appendAsync(data);
future.thenAccept(entryId -> {
    // 写入成功,entryId 是该条目的 ID
});

// Callback 风格(仅 LedgerHandle 支持)
lh.asyncAddEntry(
  data,
  (rc, ledgerHandle, entryId, ctx) -> {
      if (rc == BKException.Code.OK) {
          // 成功
      }
  },
  null);

✅ 建议优先使用 CompletableFuture,代码更清晰,且易于与 Reactor(如 Mono.fromFuture())集成。

5.4. 读取数据

读取需先打开 Ledger。

同步打开

LedgerHandle lh = bk.openLedger(
  ledgerId, 
  BookKeeper.DigestType.MAC,
  "password".getBytes());

⚠️ openLedger() 返回的是只读句柄,调用 append() 会抛异常。

Fluent API 打开(推荐)

ReadHandle rh = bk.newOpenLedgerOp()
  .withLedgerId(ledgerId)
  .withDigestType(DigestType.MAC)
  .withPassword("password".getBytes())
  .execute()
  .get(); // 注意:.get() 是阻塞的

读取条目

long lastId = rh.readLastConfirmed(); // 获取最后确认的条目 ID
Enumeration<LedgerEntry> entries = rh.read(0, lastId); // 同步读取

while (entries.hasMoreElements()) {
    LedgerEntry entry = entries.nextElement();
    byte[] data = entry.getEntry();
    // 处理数据
}

异步读取

rh.readAsync(0, lastId).thenAccept((entries) -> {
    entries.forEachRemaining((entry) -> {
        // 处理 entry
    });
});

传统异步方式(Callback)

lh.asyncReadEntries(
  0, lastId,
  (rc, lh, entries, ctx) -> {
      if (rc == BKException.Code.OK) {
          while(entries.hasMoreElements()) {
              LedgerEntry e = entries.nextElement();
              // 处理
          }
      }
  },
  null);

5.5. 列出所有 Ledger

要读取 Ledger,必须知道其 ledgerId。可以通过 LedgerManager 获取所有 Ledger ID。

public List<Long> listAllLedgers(BookKeeper bk) {
    List<Long> ledgers = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch processDone = new CountDownLatch(1);

    bk.getLedgerManager().asyncProcessLedgers(
        (ledgerId, cb) -> {
            ledgers.add(ledgerId);
            cb.processResult(BKException.Code.OK, null, null); // 必须调用
        }, 
        (rc, s, obj) -> {
            processDone.countDown(); // 所有处理完成
        },
        null,
        BKException.Code.OK,
        BKException.Code.ReadException);
 
    try {
        processDone.await(1, TimeUnit.MINUTES);
        return ledgers;
    } catch (InterruptedException ie) {
        Thread.currentThread().interrupt();
        throw new RuntimeException("Interrupted while listing ledgers", ie);
    }
}

⚠️ 关键点:

  • 第一个回调处理每个 Ledger ID,必须调用 cb.processResult() 通知框架继续
  • 第二个回调在所有 Ledger 处理完或出错时调用
  • 使用 synchronizedList 防止多线程并发问题

6. 总结

本文介绍了 Apache BookKeeper 的核心概念,并通过 Ledger API 演示了如何创建、写入、读取和管理 Ledger。

📌 核心要点回顾:

  • Ledger 是 append-only、immutable 的日志单元
  • 高可用依赖 ensemble/write/ack quorum 配置
  • 生产环境务必部署多节点 Bookie + ZooKeeper
  • 优先使用 Fluent API 和 CompletableFuture
  • 不要直接用 Ledger 实现复杂数据结构

文中所有代码示例均可在 GitHub 获取:
https://github.com/eugenp/tutorials/tree/master/persistence-modules/apache-bookkeeper


原始标题:Guide to Apache BookKeeper