1. 概述
Apache ZooKeeper 是一个分布式协调服务,能显著简化分布式应用的开发。它被 Apache Hadoop、HBase 等众多项目用于以下场景:
- 领导者选举
- 配置管理
- 节点协调
- 服务器租约管理
ZooKeeper 集群中的节点将数据存储在共享的分层命名空间中,结构类似标准文件系统或树形数据结构。
本文将探讨如何使用 ZooKeeper 的 Java API 实现数据的存储、更新和删除操作。
2. 环境搭建
ZooKeeper Java 库的最新版本可通过以下 Maven 依赖引入:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.11</version>
</dependency>
3. ZooKeeper 数据模型——ZNode
ZooKeeper 采用分层命名空间存储协调数据(如状态信息、位置信息等),每个节点称为 ZNode。
每个 ZNode 的关键特性:
- ✅ 维护数据/ACL 变更的版本号和时间戳
- ✅ 支持缓存验证和更新协调
- ⚠️ 类似文件系统的树形结构
4. 安装指南
4.1. 基础安装
4.2. 独立模式
本文采用独立模式(最小配置):
- 参考 官方文档 操作
- ⚠️ 注意:此模式无数据复制,进程崩溃将导致服务中断
5. ZooKeeper CLI 实战
通过命令行连接本地 ZooKeeper 实例:
bin/zkCli.sh -server 127.0.0.1:2181
创建 ZNode
[zk: localhost:2181(CONNECTED) 0] create /MyFirstZNode ZNodeVal
Created /FirstZnode
- 在根路径创建持久化 ZNode
- 存储初始值
ZNodeVal
读取 ZNode
[zk: localhost:2181(CONNECTED) 1] get /FirstZnode
“Myfirstzookeeper-app”
cZxid = 0x7f
ctime = Sun Feb 18 16:15:47 IST 2018
mZxid = 0x7f
mtime = Sun Feb 18 16:15:47 IST 2018
pZxid = 0x7f
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 22
numChildren = 0
更新 ZNode
set /MyFirstZNode ZNodeValUpdated
- 将数据从
ZNodeVal
更新为ZNodeValUpdated
6. Java API 开发实战
6.1. 核心包说明
ZooKeeper Java API 主要包含两个关键包:
- org.apache.zookeeper
- 定义客户端库主类
ZooKeeper
- 包含事件类型和状态的静态定义
- 定义客户端库主类
- org.apache.zookeeper.data
- 定义 ZNode 特性(ACL、ID、统计信息等)
服务端实现包(如
org.apache.zookeeper.server
)超出本文范围
6.2. 连接管理
创建 ZKConnection
类处理连接生命周期:
public class ZKConnection {
private ZooKeeper zoo;
CountDownLatch connectionLatch = new CountDownLatch(1);
public ZooKeeper connect(String host)
throws IOException, InterruptedException {
zoo = new ZooKeeper(host, 2000, new Watcher() {
public void process(WatchedEvent we) {
if (we.getState() == KeeperState.SyncConnected) {
connectionLatch.countDown();
}
}
});
connectionLatch.await();
return zoo;
}
public void close() throws InterruptedException {
zoo.close();
}
}
关键机制:
- 使用
CountDownLatch
确保连接建立 - 会话 ID 分配后需定期发送心跳保活
- ✅ 连接有效期内可调用所有 API
6.3. 客户端操作
定义操作接口
public interface ZKManager {
void create(String path, byte[] data)
throws KeeperException, InterruptedException;
Object getZNodeData(String path, boolean watchFlag);
void update(String path, byte[] data)
throws KeeperException, InterruptedException;
}
实现核心功能
public class ZKManagerImpl implements ZKManager {
private static ZooKeeper zkeeper;
private static ZKConnection zkConnection;
public ZKManagerImpl() {
initialize();
}
private void initialize() {
zkConnection = new ZKConnection();
zkeeper = zkConnection.connect("localhost");
}
public void closeConnection() {
zkConnection.close();
}
@Override
public void create(String path, byte[] data)
throws KeeperException, InterruptedException {
zkeeper.create(
path,
data,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
@Override
public Object getZNodeData(String path, boolean watchFlag)
throws KeeperException, InterruptedException {
byte[] b = zkeeper.getData(path, null, null);
return new String(b, "UTF-8");
}
@Override
public void update(String path, byte[] data)
throws KeeperException, InterruptedException {
int version = zkeeper.exists(path, true).getVersion();
zkeeper.setData(path, data, version);
}
}
操作要点:
- 创建:使用
OPEN_ACL_UNSAFE
简化权限(生产环境需谨慎) - 读取:直接获取字节数据并转为字符串
- 更新:通过版本号实现乐观锁机制
- ❌ 踩坑提示:更新时必须传入当前版本,否则会抛出异常
7. 总结
ZooKeeper 在分布式系统中扮演关键角色,特别适用于:
- 共享配置存储
- 主节点选举
- 分布式锁实现
其 Java API 提供了优雅的客户端交互方式,通过 ZNode 的版本控制机制确保数据一致性。完整示例代码可在 GitHub 获取。