2. 前提条件
开始前建议先了解 Apache Zookeeper 的基本概念。本文假设你已在本地启动了 Zookeeper 实例(地址 127.0.0.1:2181
),新手可参考官方安装指南。
首先添加 Maven 依赖:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-async</artifactId>
<version>4.0.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
⚠️ 注意:Curator 4.X.X 强依赖 Zookeeper 3.5.X(仍为测试版)。因此我们改用稳定版 Zookeeper 3.4.11,需手动排除依赖并添加:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.11</version>
</dependency>
兼容性问题详见官方说明。
3. 连接管理
Curator 的核心功能是连接 Zookeeper 实例。它通过工厂类和重试策略简化连接过程:
int sleepMsBetweenRetries = 100;
int maxRetries = 3;
RetryPolicy retryPolicy = new RetryNTimes(
maxRetries, sleepMsBetweenRetries);
CuratorFramework client = CuratorFrameworkFactory
.newClient("127.0.0.1:2181", retryPolicy);
client.start();
assertThat(client.checkExists().forPath("/")).isNotNull();
此例中,连接失败时将重试 3 次,间隔 100ms。通过 CuratorFramework
客户端即可操作 Zookeeper(如路径浏览、数据读写)。
4. 异步操作
Curator Async 模块基于 Java 8 的 CompletionStage
API,为 CuratorFramework
添加非阻塞能力:
int sleepMsBetweenRetries = 100;
int maxRetries = 3;
RetryPolicy retryPolicy
= new RetryNTimes(maxRetries, sleepMsBetweenRetries);
CuratorFramework client = CuratorFrameworkFactory
.newClient("127.0.0.1:2181", retryPolicy);
client.start();
AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
AtomicBoolean exists = new AtomicBoolean(false);
async.checkExists()
.forPath("/")
.thenAcceptAsync(s -> exists.set(s != null));
await().until(() -> assertThat(exists.get()).isTrue());
checkExists()
现在异步执行,不阻塞主线程。通过 thenAcceptAsync()
可链式处理后续操作。
5. 配置管理
在分布式系统中,共享配置管理是常见挑战。Zookeeper 可作为配置存储中心:
CuratorFramework client = newClient();
client.start();
AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
String key = getKey();
String expected = "my_value";
client.create().forPath(key);
async.setData()
.forPath(key, expected.getBytes());
AtomicBoolean isEquals = new AtomicBoolean();
async.getData()
.forPath(key)
.thenAccept(data -> isEquals.set(new String(data).equals(expected)));
await().until(() -> assertThat(isEquals.get()).isTrue());
此例创建节点路径、设置数据后读取验证。key
可以是 /config/dev/my_key
这样的路径。
5.1. 监听器(Watchers)
Zookeeper 的监听机制能实时感知配置变更,无需重新部署:
CuratorFramework client = newClient()
client.start();
AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
String key = getKey();
String expected = "my_value";
async.create().forPath(key);
List<String> changes = new ArrayList<>();
async.watched()
.getData()
.forPath(key)
.event()
.thenAccept(watchedEvent -> {
try {
changes.add(new String(client.getData()
.forPath(watchedEvent.getPath())));
} catch (Exception e) {
// 处理异常...
}});
// 修改数据触发监听
async.setData()
.forPath(key, expected.getBytes());
await()
.until(() -> assertThat(changes.size()).isEqualTo(1));
配置监听器后,数据变更会触发事件。可监听单个节点或节点集合。
6. 强类型模型
Zookeeper 原生操作字节数组,需手动序列化。Curator 提供强类型模型简化此过程:
首先添加 Jackson 依赖:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.0</version>
</dependency>
定义配置类:
public class HostConfig {
private String hostname;
private int port;
// getters and setters
}
使用模型框架:
ModelSpec<HostConfig> mySpec = ModelSpec.builder(
ZPath.parseWithIds("/config/dev"),
JacksonModelSerializer.build(HostConfig.class))
.build();
CuratorFramework client = newClient();
client.start();
AsyncCuratorFramework async
= AsyncCuratorFramework.wrap(client);
ModeledFramework<HostConfig> modeledClient
= ModeledFramework.wrap(async, mySpec);
modeledClient.set(new HostConfig("host-name", 8080));
modeledClient.read()
.whenComplete((value, e) -> {
if (e != null) {
fail("读取配置失败", e);
} else {
assertThat(value).isNotNull();
assertThat(value.getHostname()).isEqualTo("host-name");
assertThat(value.getPort()).isEqualTo(8080);
}
});
读取 /config/dev
路径时,直接返回 HostConfig
对象,无需手动序列化。
7. 食谱(Recipes)
Zookeeper 官方提供了高级解决方案指南,如领导者选举、分布式锁等。Curator 实现了其中大部分方案,完整列表见官方文档。
添加依赖:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
7.1. 领导者选举
在分布式环境中,常需选举主节点协调任务:
CuratorFramework client = newClient();
client.start();
LeaderSelector leaderSelector = new LeaderSelector(client,
"/mutex/select/leader/for/job/A",
new LeaderSelectorListener() {
@Override
public void stateChanged(
CuratorFramework client,
ConnectionState newState) {
}
@Override
public void takeLeadership(
CuratorFramework client) throws Exception {
}
});
// 加入候选组
leaderSelector.start();
// 任务完成后退出选举
leaderSelector.close();
启动后,节点加入 /mutex/select/leader/for/job/A
路径下的候选组。当选为主节点时,takeLeadership
方法被调用,可在此执行协调逻辑。
7.2. 共享锁
实现完全分布式锁:
CuratorFramework client = newClient();
client.start();
InterProcessSemaphoreMutex sharedLock = new InterProcessSemaphoreMutex(
client, "/mutex/process/A");
sharedLock.acquire();
// 执行任务 A
sharedLock.release();
获取锁时,Zookeeper 确保全局唯一性,避免并发冲突。
7.3. 计数器
协调多客户端共享的整数值:
CuratorFramework client = newClient();
client.start();
SharedCount counter = new SharedCount(client, "/counters/A", 0);
counter.start();
counter.setCount(counter.getCount() + 1);
assertThat(counter.getCount()).isEqualTo(1);
Zookeeper 在 /counters/A
路径存储整数值,路径不存在时初始化为 0。
8. 结论
本文介绍了 Apache Curator 的核心功能:连接管理、异步操作、配置管理、强类型模型及常用分布式解决方案(Recipes)。这些特性极大简化了 Zookeeper 的使用,是构建分布式系统的利器。
完整代码示例见 GitHub 仓库。