2. 前提条件

开始前建议先了解 Apache Zookeeper 的基本概念。本文假设你已在本地启动了 Zookeeper 实例(地址 127.0.0.1:2181),新手可参考官方安装指南

首先添加 Maven 依赖:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-x-async</artifactId>
    <version>4.0.1</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>

⚠️ 注意:Curator 4.X.X 强依赖 Zookeeper 3.5.X(仍为测试版)。因此我们改用稳定版 Zookeeper 3.4.11,需手动排除依赖并添加:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.11</version>
</dependency>

兼容性问题详见官方说明

3. 连接管理

Curator 的核心功能是连接 Zookeeper 实例。它通过工厂类和重试策略简化连接过程:

int sleepMsBetweenRetries = 100;
int maxRetries = 3;
RetryPolicy retryPolicy = new RetryNTimes(
  maxRetries, sleepMsBetweenRetries);

CuratorFramework client = CuratorFrameworkFactory
  .newClient("127.0.0.1:2181", retryPolicy);
client.start();
 
assertThat(client.checkExists().forPath("/")).isNotNull();

此例中,连接失败时将重试 3 次,间隔 100ms。通过 CuratorFramework 客户端即可操作 Zookeeper(如路径浏览、数据读写)。

4. 异步操作

Curator Async 模块基于 Java 8 的 CompletionStage API,为 CuratorFramework 添加非阻塞能力:

int sleepMsBetweenRetries = 100;
int maxRetries = 3;
RetryPolicy retryPolicy 
  = new RetryNTimes(maxRetries, sleepMsBetweenRetries);

CuratorFramework client = CuratorFrameworkFactory
  .newClient("127.0.0.1:2181", retryPolicy);

client.start();
AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);

AtomicBoolean exists = new AtomicBoolean(false);

async.checkExists()
  .forPath("/")
  .thenAcceptAsync(s -> exists.set(s != null));

await().until(() -> assertThat(exists.get()).isTrue());

checkExists() 现在异步执行,不阻塞主线程。通过 thenAcceptAsync() 可链式处理后续操作。

5. 配置管理

在分布式系统中,共享配置管理是常见挑战。Zookeeper 可作为配置存储中心:

CuratorFramework client = newClient();
client.start();
AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
String key = getKey();
String expected = "my_value";

client.create().forPath(key);

async.setData()
  .forPath(key, expected.getBytes());

AtomicBoolean isEquals = new AtomicBoolean();
async.getData()
  .forPath(key)
  .thenAccept(data -> isEquals.set(new String(data).equals(expected)));

await().until(() -> assertThat(isEquals.get()).isTrue());

此例创建节点路径、设置数据后读取验证。key 可以是 /config/dev/my_key 这样的路径。

5.1. 监听器(Watchers)

Zookeeper 的监听机制能实时感知配置变更,无需重新部署:

CuratorFramework client = newClient()
client.start();
AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
String key = getKey();
String expected = "my_value";

async.create().forPath(key);

List<String> changes = new ArrayList<>();

async.watched()
  .getData()
  .forPath(key)
  .event()
  .thenAccept(watchedEvent -> {
    try {
        changes.add(new String(client.getData()
          .forPath(watchedEvent.getPath())));
    } catch (Exception e) {
        // 处理异常...
    }});

// 修改数据触发监听
async.setData()
  .forPath(key, expected.getBytes());

await()
  .until(() -> assertThat(changes.size()).isEqualTo(1));

配置监听器后,数据变更会触发事件。可监听单个节点或节点集合。

6. 强类型模型

Zookeeper 原生操作字节数组,需手动序列化。Curator 提供强类型模型简化此过程:

首先添加 Jackson 依赖:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.13.0</version>
</dependency>

定义配置类:

public class HostConfig {
    private String hostname;
    private int port;

    // getters and setters
}

使用模型框架:

ModelSpec<HostConfig> mySpec = ModelSpec.builder(
  ZPath.parseWithIds("/config/dev"), 
  JacksonModelSerializer.build(HostConfig.class))
  .build();

CuratorFramework client = newClient();
client.start();

AsyncCuratorFramework async 
  = AsyncCuratorFramework.wrap(client);
ModeledFramework<HostConfig> modeledClient 
  = ModeledFramework.wrap(async, mySpec);

modeledClient.set(new HostConfig("host-name", 8080));

modeledClient.read()
  .whenComplete((value, e) -> {
     if (e != null) {
          fail("读取配置失败", e);
     } else {
          assertThat(value).isNotNull();
          assertThat(value.getHostname()).isEqualTo("host-name");
          assertThat(value.getPort()).isEqualTo(8080);
     }
   });

读取 /config/dev 路径时,直接返回 HostConfig 对象,无需手动序列化。

7. 食谱(Recipes)

Zookeeper 官方提供了高级解决方案指南,如领导者选举、分布式锁等。Curator 实现了其中大部分方案,完整列表见官方文档

添加依赖:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.1</version>
</dependency>

7.1. 领导者选举

在分布式环境中,常需选举主节点协调任务:

CuratorFramework client = newClient();
client.start();
LeaderSelector leaderSelector = new LeaderSelector(client, 
  "/mutex/select/leader/for/job/A", 
  new LeaderSelectorListener() {
      @Override
      public void stateChanged(
        CuratorFramework client, 
        ConnectionState newState) {
      }

      @Override
      public void takeLeadership(
        CuratorFramework client) throws Exception {
      }
  });

// 加入候选组
leaderSelector.start();

// 任务完成后退出选举
leaderSelector.close();

启动后,节点加入 /mutex/select/leader/for/job/A 路径下的候选组。当选为主节点时,takeLeadership 方法被调用,可在此执行协调逻辑。

7.2. 共享锁

实现完全分布式锁:

CuratorFramework client = newClient();
client.start();
InterProcessSemaphoreMutex sharedLock = new InterProcessSemaphoreMutex(
  client, "/mutex/process/A");

sharedLock.acquire();

// 执行任务 A

sharedLock.release();

获取锁时,Zookeeper 确保全局唯一性,避免并发冲突。

7.3. 计数器

协调多客户端共享的整数值:

CuratorFramework client = newClient();
client.start();

SharedCount counter = new SharedCount(client, "/counters/A", 0);
counter.start();

counter.setCount(counter.getCount() + 1);

assertThat(counter.getCount()).isEqualTo(1);

Zookeeper 在 /counters/A 路径存储整数值,路径不存在时初始化为 0。

8. 结论

本文介绍了 Apache Curator 的核心功能:连接管理、异步操作、配置管理、强类型模型及常用分布式解决方案(Recipes)。这些特性极大简化了 Zookeeper 的使用,是构建分布式系统的利器。

完整代码示例见 GitHub 仓库


原始标题:Introduction to Apache Curator