1. 概述
大多数分布式应用都需要具备一致性和容错能力的有状态组件。Atomix 是一个可嵌入的库,专门用于实现分布式资源的容错性和一致性。
它提供了丰富的 API 来管理资源,包括集合、组和并发工具等。
要开始使用,首先添加以下 Maven 依赖到 pom.xml:
<dependency>
<groupId>io.atomix</groupId>
<artifactId>atomix-all</artifactId>
<version>1.0.8</version>
</dependency>
这个依赖提供了基于 Netty 的传输层,供节点间通信使用。
2. 集群启动
使用 Atomix 首先需要启动一个集群。
Atomix 由一组副本(replica)组成,用于创建有状态的分布式资源。每个副本都维护集群中所有资源的状态副本。
集群中的副本分为两种类型:活跃副本(active)和被动副本(passive)。
分布式资源的状态变更通过活跃副本传播,而被动副本则保持同步以实现容错。
2.1. 启动嵌入式集群
要启动单节点集群,首先创建一个 AtomixReplica
实例:
AtomixReplica replica = AtomixReplica.builder(
new Address("localhost", 8700))
.withStorage(storage)
.withTransport(new NettyTransport())
.build();
这里需要配置 Storage
和 Transport
。存储配置代码如下:
Storage storage = Storage.builder()
.withDirectory(new File("logs"))
.withStorageLevel(StorageLevel.DISK)
.build();
配置好存储和传输后,通过调用 bootstrap()
启动副本——该方法返回一个 CompletableFuture
,可调用其阻塞方法 join()
等待服务器启动完成:
CompletableFuture<AtomixReplica> future = replica.bootstrap();
future.join();
现在我们构建了一个单节点集群。接下来可以向其中添加更多节点。
为此需要创建其他副本并加入现有集群;注意调用 join(Address)
方法时需在新线程中执行:
AtomixReplica replica2 = AtomixReplica.builder(
new Address("localhost", 8701))
.withStorage(storage)
.withTransport(new NettyTransport())
.build();
replica2
.join(new Address("localhost", 8700))
.join();
AtomixReplica replica3 = AtomixReplica.builder(
new Address("localhost", 8702))
.withStorage(storage)
.withTransport(new NettyTransport())
.build();
replica3.join(
new Address("localhost", 8700),
new Address("localhost", 8701))
.join();
现在我们启动了一个三节点集群。也可以通过 bootstrap(List<Address>)
方法传入地址列表来启动集群:
List<Address> cluster = Arrays.asList(
new Address("localhost", 8700),
new Address("localhost", 8701),
new Address("localhost", 8702)); // 修正原文拼写错误
AtomixReplica replica1 = AtomixReplica
.builder(cluster.get(0))
.build();
replica1.bootstrap(cluster).join();
AtomixReplica replica2 = AtomixReplica
.builder(cluster.get(1))
.build();
replica2.bootstrap(cluster).join();
AtomixReplica replica3 = AtomixReplica
.builder(cluster.get(2))
.build();
replica3.bootstrap(cluster).join();
每个副本都需要在新线程中启动。
2.2. 启动独立集群
Atomix 服务器可作为独立服务器运行,可从 Maven Central 下载。简单说——这是一个 Java 归档文件,可通过终端运行,需在地址标志中提供 host:port
参数并使用 -bootstrap
标志。
启动集群的命令如下:
java -jar atomix-standalone-server.jar
-address 127.0.0.1:8700 -bootstrap -config atomix.properties
其中 atomix.properties
是配置存储和传输的配置文件。要创建多节点集群,可使用 -join
标志向现有集群添加节点:
java -jar atomix-standalone-server.jar
-address 127.0.0.1:8701 -join 127.0.0.1:8700
3. 客户端操作
Atomix 支持通过 AtomixClient
API 创建客户端远程访问集群。
由于客户端无需保持状态,AtomixClient
不需要任何存储。创建客户端时只需配置传输层,用于与集群通信:
AtomixClient client = AtomixClient.builder()
.withTransport(new NettyTransport())
.build();
现在需要将客户端连接到集群。
可以声明一个 Address
列表,并将其作为参数传递给客户端的 connect()
方法:
client.connect(cluster)
.thenRun(() -> {
System.out.println("客户端已连接到集群!");
});
4. 资源管理
Atomix 的真正威力在于其强大的 API,用于创建和管理分布式资源。资源在集群中被复制和持久化,并由复制状态机支撑——其底层实现基于 Raft 共识算法。
分布式资源可通过其 get()
方法创建和管理。我们可以从 AtomixReplica
创建分布式资源实例。
假设 replica
是 AtomixReplica
的实例,创建分布式映射资源并设置值的代码如下:
replica.getMap("map")
.thenCompose(m -> m.put("bar", "Hello world!"))
.thenRun(() -> System.out.println("值已设置到分布式映射"))
.join();
这里的 join()
方法会阻塞程序,直到资源创建完成且值被设置。我们可以使用 AtomixClient
获取同一对象,并通过 get("bar")
方法检索值:
String value = client.getMap("map")
.thenCompose(m -> m.get("bar"))
.thenApply(a -> (String) a)
.get();
5. 一致性与容错性
Atomix 适用于关键任务型小规模数据集,这类场景中一致性比可用性更重要。
它通过线性一致性(linearizability)为读写操作提供强可配置一致性。在线性一致性模型中,一旦写入提交,所有客户端都能保证感知到结果状态。
Atomix 集群的一致性由底层 Raft 共识算法保证,选举出的领导者将拥有所有先前成功的写入操作。
所有新写入都通过集群领导者进行,并在完成前同步复制到大多数服务器。
为保持容错性,集群中大多数服务器需要存活。如果少数节点故障,这些节点将被标记为不活跃,并由被动节点或备用节点替代。
⚠️ 关键点:
- 领导者故障时,集群中剩余服务器将开始新的领导者选举。期间集群将不可用
- 发生网络分区时:
- 若领导者处于非法定人数侧,它将下台,法定人数侧选举新领导者
- 若领导者处于多数侧,它将继续正常工作
- 分区解决后,非法定人数侧的节点将加入法定人数并更新日志
6. 总结
与 ZooKeeper 类似,Atomix 为处理分布式计算问题提供了强大的库集合。
完整源代码可在 GitHub 获取。