1. 概述
在分布式系统中,当集群包含N个副本节点时,我们可能会遇到网络分区——部分节点暂时无法相互通信的情况。这种现象被称为脑裂(split-brain)。
当系统发生脑裂时,即使来自同一用户的写请求也可能被路由到无法互相通信的不同副本。此时系统虽然保持可用性,但数据一致性被破坏。
当分裂的集群之间网络恢复后,我们需要决定如何处理这些不一致的数据。这就是CRDT(无冲突复制数据类型)要解决的问题。
2. CRDT的解决方案
假设有两个节点A和B因脑裂断开连接。用户修改登录名时,第一次请求到达节点A,第二次修改请求到达节点B。
由于脑裂,两个节点无法同步数据。当网络恢复后,我们需要决定该用户的登录名最终应该是什么。我们可以选择:
- 让用户手动解决冲突(如Google Docs的做法)
- 使用CRDT自动合并分歧副本的数据
3. Maven依赖
首先添加wurmloch-crdt库的依赖,它提供了多种实用的CRDT实现:
<dependency>
<groupId>com.netopyr.wurmloch</groupId>
<artifactId>wurmloch-crdt</artifactId>
<version>0.1.0</version>
</dependency>
最新版本可在Maven Central获取。
4. 增长集合(GSet)
最基础的CRDT是增长集合(Grow-Only Set)。✅ 元素只能添加不能删除,合并时直接计算两个集合的并集。
先创建两个副本模拟分布式环境,并用connect()
方法建立连接:
LocalCrdtStore crdtStore1 = new LocalCrdtStore();
LocalCrdtStore crdtStore2 = new LocalCrdtStore();
crdtStore1.connect(crdtStore2);
在第一个副本上创建GSet,并在第二个副本上引用它:
GSet<String> replica1 = crdtStore1.createGSet("ID_1");
GSet<String> replica2 = crdtStore2.<String>findGSet("ID_1").get();
集群正常工作时,从不同副本添加元素会自动同步:
replica1.add("apple");
replica2.add("banana");
assertThat(replica1).contains("apple", "banana");
assertThat(replica2).contains("apple", "banana");
模拟网络分区(断开连接):
crdtStore1.disconnect(crdtStore2);
此时添加的元素不会同步:
replica1.add("strawberry");
replica2.add("pear");
assertThat(replica1).contains("apple", "banana", "strawberry");
assertThat(replica2).contains("apple", "banana", "pear");
⚠️ 网络恢复后,GSet自动通过并集合并数据:
crdtStore1.connect(crdtStore2);
assertThat(replica1)
.contains("apple", "banana", "strawberry", "pear");
assertThat(replica2)
.contains("apple", "banana", "strawberry", "pear");
5. 单增计数器(GCounter)
单增计数器(Increment-Only Counter)在本地聚合所有增量。副本同步时,结果值等于所有节点增量的总和——类似于java.concurrent.LongAdder
,但在更高抽象层实现。
创建GCounter并从不同副本递增:
LocalCrdtStore crdtStore1 = new LocalCrdtStore();
LocalCrdtStore crdtStore2 = new LocalCrdtStore();
crdtStore1.connect(crdtStore2);
GCounter replica1 = crdtStore1.createGCounter("ID_1");
GCounter replica2 = crdtStore2.findGCounter("ID_1").get();
replica1.increment();
replica2.increment(2L);
assertThat(replica1.get()).isEqualTo(3L);
assertThat(replica2.get()).isEqualTo(3L);
网络分区后,本地递增操作会导致数据不一致:
crdtStore1.disconnect(crdtStore2);
replica1.increment(3L);
replica2.increment(5L);
assertThat(replica1.get()).isEqualTo(6L);
assertThat(replica2.get()).isEqualTo(8L);
集群恢复后,增量自动合并得到正确值:
crdtStore1.connect(crdtStore2);
assertThat(replica1.get())
.isEqualTo(11L);
assertThat(replica2.get())
.isEqualTo(11L);
6. PN计数器
PN计数器(PNCounter)扩展了单增计数器,支持增减操作。它分别存储所有增量和减量,合并时结果值 = 增量总和 - 减量总和:
@Test
public void givenPNCounter_whenReplicasDiverge_thenMergesWithoutConflict() {
LocalCrdtStore crdtStore1 = new LocalCrdtStore();
LocalCrdtStore crdtStore2 = new LocalCrdtStore();
crdtStore1.connect(crdtStore2);
PNCounter replica1 = crdtStore1.createPNCounter("ID_1");
PNCounter replica2 = crdtStore2.findPNCounter("ID_1").get();
replica1.increment();
replica2.decrement(2L);
assertThat(replica1.get()).isEqualTo(-1L);
assertThat(replica2.get()).isEqualTo(-1L);
crdtStore1.disconnect(crdtStore2);
replica1.decrement(3L);
replica2.increment(5L);
assertThat(replica1.get()).isEqualTo(-4L);
assertThat(replica2.get()).isEqualTo(4L);
crdtStore1.connect(crdtStore2);
assertThat(replica1.get()).isEqualTo(1L);
assertThat(replica2.get()).isEqualTo(1L);
}
7. 最后写入者获胜寄存器(LWWRegister)
当业务规则更复杂时,集合和计数器可能不够用。最后写入者获胜寄存器(Last-Writer-Wins Register)在合并时只保留最新更新的值(Cassandra就采用这种策略解决冲突)。
❌ 使用此策略要格外小心,它会丢弃中间发生的变更。
创建两个副本和LWWRegister实例:
LocalCrdtStore crdtStore1 = new LocalCrdtStore("N_1");
LocalCrdtStore crdtStore2 = new LocalCrdtStore("N_2");
crdtStore1.connect(crdtStore2);
LWWRegister<String> replica1 = crdtStore1.createLWWRegister("ID_1");
LWWRegister<String> replica2 = crdtStore2.<String>findLWWRegister("ID_1").get();
replica1.set("apple");
replica2.set("banana");
assertThat(replica1.get()).isEqualTo("banana");
assertThat(replica2.get()).isEqualTo("banana");
当副本1设置"apple"
后副本2设置"banana"
,LWWRegister只保留最后值。
模拟网络分区:
crdtStore1.disconnect(crdtStore2);
replica1.set("strawberry");
replica2.set("pear");
assertThat(replica1.get()).isEqualTo("strawberry");
assertThat(replica2.get()).isEqualTo("pear");
每个副本保留本地不一致的数据副本。调用set()
时,LWWRegister内部通过向量时钟(VectorClock)算法为每次更新分配版本号。
集群同步时,自动选择版本号最高的值并丢弃所有旧版本:
crdtStore1.connect(crdtStore2);
assertThat(replica1.get()).isEqualTo("pear");
assertThat(replica2.get()).isEqualTo("pear");
8. 结论
本文展示了分布式系统在保证可用性时面临的一致性问题。当网络分区导致数据分歧后,CRDT提供了一种简单粗暴的自动合并方案。
我们通过wurmloch-crdt库实现了四种核心CRDT:
- ✅ GSet:只增集合,合并取并集
- ✅ GCounter:单增计数器,合并求和
- ✅ PNCounter:增减计数器,合并用增量-减量
- ⚠️ LWWRegister:最后写入获胜,合并取最新值
所有示例代码可在GitHub项目中获取,这是一个Maven项目,可直接导入运行。