1. 概述

Apache ZooKeeper 是一个分布式协调服务,能显著简化分布式应用的开发。它被 Apache Hadoop、HBase 等众多项目用于以下场景:

  • 领导者选举
  • 配置管理
  • 节点协调
  • 服务器租约管理

ZooKeeper 集群中的节点将数据存储在共享的分层命名空间中,结构类似标准文件系统或树形数据结构。

本文将探讨如何使用 ZooKeeper 的 Java API 实现数据的存储、更新和删除操作。

2. 环境搭建

ZooKeeper Java 库的最新版本可通过以下 Maven 依赖引入:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.11</version>
</dependency>

3. ZooKeeper 数据模型——ZNode

ZooKeeper 采用分层命名空间存储协调数据(如状态信息、位置信息等),每个节点称为 ZNode

每个 ZNode 的关键特性:

  • ✅ 维护数据/ACL 变更的版本号和时间戳
  • ✅ 支持缓存验证和更新协调
  • ⚠️ 类似文件系统的树形结构

4. 安装指南

4.1. 基础安装

官方下载页 获取最新版本。安装前请确认满足 系统要求

4.2. 独立模式

本文采用独立模式(最小配置):

  1. 参考 官方文档 操作
  2. ⚠️ 注意:此模式无数据复制,进程崩溃将导致服务中断

5. ZooKeeper CLI 实战

通过命令行连接本地 ZooKeeper 实例:

bin/zkCli.sh -server 127.0.0.1:2181

创建 ZNode

[zk: localhost:2181(CONNECTED) 0] create /MyFirstZNode ZNodeVal
Created /FirstZnode
  • 在根路径创建持久化 ZNode
  • 存储初始值 ZNodeVal

读取 ZNode

[zk: localhost:2181(CONNECTED) 1] get /FirstZnode

“Myfirstzookeeper-app”
cZxid = 0x7f
ctime = Sun Feb 18 16:15:47 IST 2018
mZxid = 0x7f
mtime = Sun Feb 18 16:15:47 IST 2018
pZxid = 0x7f
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 22
numChildren = 0

更新 ZNode

set /MyFirstZNode ZNodeValUpdated
  • 将数据从 ZNodeVal 更新为 ZNodeValUpdated

6. Java API 开发实战

6.1. 核心包说明

ZooKeeper Java API 主要包含两个关键包:

  1. org.apache.zookeeper
    • 定义客户端库主类 ZooKeeper
    • 包含事件类型和状态的静态定义
  2. org.apache.zookeeper.data
    • 定义 ZNode 特性(ACL、ID、统计信息等)

服务端实现包(如 org.apache.zookeeper.server)超出本文范围

6.2. 连接管理

创建 ZKConnection 类处理连接生命周期:

public class ZKConnection {
    private ZooKeeper zoo;
    CountDownLatch connectionLatch = new CountDownLatch(1);

    public ZooKeeper connect(String host) 
      throws IOException, InterruptedException {
        zoo = new ZooKeeper(host, 2000, new Watcher() {
            public void process(WatchedEvent we) {
                if (we.getState() == KeeperState.SyncConnected) {
                    connectionLatch.countDown();
                }
            }
        });
        connectionLatch.await();
        return zoo;
    }

    public void close() throws InterruptedException {
        zoo.close();
    }
}

关键机制:

  • 使用 CountDownLatch 确保连接建立
  • 会话 ID 分配后需定期发送心跳保活
  • ✅ 连接有效期内可调用所有 API

6.3. 客户端操作

定义操作接口

public interface ZKManager {
    void create(String path, byte[] data) 
      throws KeeperException, InterruptedException;
    Object getZNodeData(String path, boolean watchFlag);
    void update(String path, byte[] data) 
      throws KeeperException, InterruptedException;
}

实现核心功能

public class ZKManagerImpl implements ZKManager {
    private static ZooKeeper zkeeper;
    private static ZKConnection zkConnection;

    public ZKManagerImpl() {
        initialize();
    }

    private void initialize() {
        zkConnection = new ZKConnection();
        zkeeper = zkConnection.connect("localhost");
    }

    public void closeConnection() {
        zkConnection.close();
    }

    @Override
    public void create(String path, byte[] data) 
      throws KeeperException, InterruptedException {
        zkeeper.create(
          path, 
          data, 
          ZooDefs.Ids.OPEN_ACL_UNSAFE, 
          CreateMode.PERSISTENT);
    }

    @Override
    public Object getZNodeData(String path, boolean watchFlag) 
      throws KeeperException, InterruptedException {
        byte[] b = zkeeper.getData(path, null, null);
        return new String(b, "UTF-8");
    }

    @Override
    public void update(String path, byte[] data) 
      throws KeeperException, InterruptedException {
        int version = zkeeper.exists(path, true).getVersion();
        zkeeper.setData(path, data, version);
    }
}

操作要点:

  • 创建:使用 OPEN_ACL_UNSAFE 简化权限(生产环境需谨慎)
  • 读取:直接获取字节数据并转为字符串
  • 更新:通过版本号实现乐观锁机制
  • ❌ 踩坑提示:更新时必须传入当前版本,否则会抛出异常

7. 总结

ZooKeeper 在分布式系统中扮演关键角色,特别适用于:

  • 共享配置存储
  • 主节点选举
  • 分布式锁实现

其 Java API 提供了优雅的客户端交互方式,通过 ZNode 的版本控制机制确保数据一致性。完整示例代码可在 GitHub 获取。


原始标题:Getting Started with Java and Zookeeper | Baeldung