1. 概述

大多数分布式应用都需要具备一致性和容错能力的有状态组件。Atomix 是一个可嵌入的库,专门用于实现分布式资源的容错性和一致性

它提供了丰富的 API 来管理资源,包括集合、组和并发工具等。

要开始使用,首先添加以下 Maven 依赖到 pom.xml:

<dependency>
    <groupId>io.atomix</groupId>
    <artifactId>atomix-all</artifactId>
    <version>1.0.8</version>
</dependency>

这个依赖提供了基于 Netty 的传输层,供节点间通信使用。

2. 集群启动

使用 Atomix 首先需要启动一个集群。

Atomix 由一组副本(replica)组成,用于创建有状态的分布式资源。每个副本都维护集群中所有资源的状态副本

集群中的副本分为两种类型:活跃副本(active)和被动副本(passive)。

分布式资源的状态变更通过活跃副本传播,而被动副本则保持同步以实现容错。

2.1. 启动嵌入式集群

要启动单节点集群,首先创建一个 AtomixReplica 实例:

AtomixReplica replica = AtomixReplica.builder(
  new Address("localhost", 8700))
   .withStorage(storage)
   .withTransport(new NettyTransport())
   .build();

这里需要配置 StorageTransport。存储配置代码如下:

Storage storage = Storage.builder()
  .withDirectory(new File("logs"))
  .withStorageLevel(StorageLevel.DISK)
  .build();

配置好存储和传输后,通过调用 bootstrap() 启动副本——该方法返回一个 CompletableFuture,可调用其阻塞方法 join() 等待服务器启动完成:

CompletableFuture<AtomixReplica> future = replica.bootstrap();
future.join();

现在我们构建了一个单节点集群。接下来可以向其中添加更多节点

为此需要创建其他副本并加入现有集群;注意调用 join(Address) 方法时需在新线程中执行:

AtomixReplica replica2 = AtomixReplica.builder(
  new Address("localhost", 8701))
    .withStorage(storage)
    .withTransport(new NettyTransport())
    .build();
  
replica2
  .join(new Address("localhost", 8700))
  .join();

AtomixReplica replica3 = AtomixReplica.builder(
  new Address("localhost", 8702))
    .withStorage(storage)
    .withTransport(new NettyTransport())
    .build();

replica3.join(
  new Address("localhost", 8700), 
  new Address("localhost", 8701))
  .join();

现在我们启动了一个三节点集群。也可以通过 bootstrap(List<Address>) 方法传入地址列表来启动集群:

List<Address> cluster = Arrays.asList(
  new Address("localhost", 8700), 
  new Address("localhost", 8701), 
  new Address("localhost", 8702)); // 修正原文拼写错误

AtomixReplica replica1 = AtomixReplica
  .builder(cluster.get(0))
  .build();
replica1.bootstrap(cluster).join();

AtomixReplica replica2 = AtomixReplica
  .builder(cluster.get(1))
  .build();
            
replica2.bootstrap(cluster).join();

AtomixReplica replica3 = AtomixReplica
  .builder(cluster.get(2))
  .build();

replica3.bootstrap(cluster).join();

每个副本都需要在新线程中启动。

2.2. 启动独立集群

Atomix 服务器可作为独立服务器运行,可从 Maven Central 下载。简单说——这是一个 Java 归档文件,可通过终端运行,需在地址标志中提供 host:port 参数并使用 -bootstrap 标志。

启动集群的命令如下:

java -jar atomix-standalone-server.jar 
  -address 127.0.0.1:8700 -bootstrap -config atomix.properties

其中 atomix.properties 是配置存储和传输的配置文件。要创建多节点集群,可使用 -join 标志向现有集群添加节点:

java -jar atomix-standalone-server.jar 
  -address 127.0.0.1:8701 -join 127.0.0.1:8700

3. 客户端操作

Atomix 支持通过 AtomixClient API 创建客户端远程访问集群。

由于客户端无需保持状态,AtomixClient 不需要任何存储。创建客户端时只需配置传输层,用于与集群通信:

AtomixClient client = AtomixClient.builder()
  .withTransport(new NettyTransport())
  .build();

现在需要将客户端连接到集群。

可以声明一个 Address 列表,并将其作为参数传递给客户端的 connect() 方法:

client.connect(cluster)
  .thenRun(() -> {
      System.out.println("客户端已连接到集群!");
  });

4. 资源管理

Atomix 的真正威力在于其强大的 API,用于创建和管理分布式资源。资源在集群中被复制和持久化,并由复制状态机支撑——其底层实现基于 Raft 共识算法。

分布式资源可通过其 get() 方法创建和管理。我们可以从 AtomixReplica 创建分布式资源实例。

假设 replicaAtomixReplica 的实例,创建分布式映射资源并设置值的代码如下:

replica.getMap("map")
  .thenCompose(m -> m.put("bar", "Hello world!"))
  .thenRun(() -> System.out.println("值已设置到分布式映射"))
  .join();

这里的 join() 方法会阻塞程序,直到资源创建完成且值被设置。我们可以使用 AtomixClient 获取同一对象,并通过 get("bar") 方法检索值:

String value = client.getMap("map")
  .thenCompose(m -> m.get("bar"))
  .thenApply(a -> (String) a)
  .get();

5. 一致性与容错性

Atomix 适用于关键任务型小规模数据集,这类场景中一致性比可用性更重要。

它通过线性一致性(linearizability)为读写操作提供强可配置一致性。在线性一致性模型中,一旦写入提交,所有客户端都能保证感知到结果状态。

Atomix 集群的一致性由底层 Raft 共识算法保证,选举出的领导者将拥有所有先前成功的写入操作。

所有新写入都通过集群领导者进行,并在完成前同步复制到大多数服务器。

为保持容错性,集群中大多数服务器需要存活。如果少数节点故障,这些节点将被标记为不活跃,并由被动节点或备用节点替代。

⚠️ 关键点

  • 领导者故障时,集群中剩余服务器将开始新的领导者选举。期间集群将不可用
  • 发生网络分区时:
    • 若领导者处于非法定人数侧,它将下台,法定人数侧选举新领导者
    • 若领导者处于多数侧,它将继续正常工作
    • 分区解决后,非法定人数侧的节点将加入法定人数并更新日志

6. 总结

与 ZooKeeper 类似,Atomix 为处理分布式计算问题提供了强大的库集合。

完整源代码可在 GitHub 获取。


原始标题:Introduction to Atomix