1. 概述
本文将介绍 JCTools(Java Concurrency Tools)库。简单来说,它提供了一系列专为多线程环境设计的实用数据结构,能帮我们更高效地处理并发问题。
2. 非阻塞算法
传统多线程代码操作共享可变状态时,通常依赖锁来保证数据一致性和可见性。但这种方式存在明显缺陷:
- ❌ 线程可能因获取锁而阻塞,导致并行度下降
- ❌ 锁竞争越激烈,JVM 调度开销越大,实际工作效率越低
- ❌ 多锁场景下可能出现死锁
- ❌ 存在优先级反转风险(高优先级线程被低优先级线程持有的锁阻塞)
- ❌ 粗粒度锁会严重限制并行性,而细粒度锁设计复杂且易出错
非阻塞算法是更好的替代方案:任何线程的失败或暂停不会影响其他线程。这类算法分为两类:
- 无锁(Lock-Free):保证至少有一个线程能在有限时间内取得进展(不会死锁)
- 无等待(Wait-Free):保证每个线程都能在有限时间内取得进展
下面是一个经典的非阻塞栈实现(出自《Java Concurrency in Practice》):
public class ConcurrentStack<E> {
AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();
private static class Node <E> {
public E item;
public Node<E> next;
// 标准构造器
}
}
核心操作方法:
public void push(E item){
Node<E> newHead = new Node<E>(item);
Node<E> oldHead;
do {
oldHead = top.get();
newHead.next = oldHead;
} while(!top.compareAndSet(oldHead, newHead));
}
public E pop() {
Node<E> oldHead;
Node<E> newHead;
do {
oldHead = top.get();
if (oldHead == null) {
return null;
}
newHead = oldHead.next;
} while (!top.compareAndSet(oldHead, newHead));
return oldHead.item;
}
这个实现通过细粒度的 CAS(Compare-And-Swap) 操作实现无锁特性(即使多线程同时调用 top.compareAndSet()
,也必然有一个成功),但不是无等待的(无法保证特定线程的 CAS 操作最终成功)。
3. 依赖配置
在 pom.xml
中添加 JCTools 依赖:
<dependency>
<groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId>
<version>2.1.2</version>
</dependency>
最新版本可在 Maven Central 查询。
4. JCTools 队列详解
JCTools 提供了多种线程安全的无锁队列,核心接口是 org.jctools.queues.MessagePassingQueue
。
4.1. 队列类型分类
根据生产者/消费者策略分为四类:
类型 | 前缀 | 示例 | 适用场景 |
---|---|---|---|
单生产者/单消费者 | Spsc |
SpscArrayQueue |
明确的生产消费配对 |
单生产者/多消费者 | Spmc |
SpmcArrayQueue |
多个消费者处理同一生产源 |
多生产者/单消费者 | Mpsc |
MpscArrayQueue |
多个生产者写入单一消费者 |
多生产者/多消费者 | Mpmc |
MpmcArrayQueue |
完全自由的生产消费模式 |
⚠️ 踩坑警告:队列内部不检查策略合规性!错误使用可能导致数据丢失。例如:
SpscArrayQueue<Integer> queue = new SpscArrayQueue<>(2);
Thread producer1 = new Thread(() -> queue.offer(1));
producer1.start();
producer1.join();
Thread producer2 = new Thread(() -> queue.offer(2));
producer2.start();
producer2.join();
Set<Integer> fromQueue = new HashSet<>();
Thread consumer = new Thread(() -> queue.drain(fromQueue::add));
consumer.start();
consumer.join();
assertThat(fromQueue).containsOnly(1, 2); // 测试通过但实际不安全!
4.2. 队列实现清单
所有队列实现汇总:
- **
SpscArrayQueue
**:单生产者/单消费者,数组实现,有界容量 - **
SpscLinkedQueue
**:单生产者/单消费者,链表实现,无界容量 - **
SpscChunkedArrayQueue
**:单生产者/单消费者,初始容量可动态增长至最大容量 - **
SpscGrowableArrayQueue
**:同上,但内部块管理不同(推荐用SpscChunkedArrayQueue
) - **
SpscUnboundedArrayQueue
**:单生产者/单消费者,数组实现,无界容量 - **
SpmcArrayQueue
**:单生产者/多消费者,数组实现,有界容量 - **
MpscArrayQueue
**:多生产者/单消费者,数组实现,有界容量 - **
MpscLinkedQueue
**:多生产者/单消费者,链表实现,无界容量 - **
MpmcArrayQueue
**:多生产者/多消费者,数组实现,有界容量
4.3. Atomic 队列变体
前述队列均依赖 sun.misc.Unsafe
,但在 Java 9+(JEP-260)中默认不可用。因此提供了基于 java.util.concurrent.atomic.AtomicLongFieldUpdater
的替代方案:
- 命名规则:在原队列名中插入
Atomic
(如SpscChunkedAtomicArrayQueue
) - 性能略低,但兼容性更好
- 推荐优先使用常规队列,仅在 HotSpot Java9+ 或 JRockit 环境中使用 Atomic 变体
4.4. 容量管理
JCTools 队列分为有界和无界两类。有界队列满时会拒绝新元素,典型使用模式:
SpscChunkedArrayQueue<Integer> queue = new SpscChunkedArrayQueue<>(8, 16);
CountDownLatch startConsuming = new CountDownLatch(1);
CountDownLatch awakeProducer = new CountDownLatch(1);
Thread producer = new Thread(() -> {
// 填满队列
IntStream.range(0, queue.capacity()).forEach(i -> {
assertThat(queue.offer(i)).isTrue();
});
// 尝试添加超出容量的元素
assertThat(queue.offer(queue.capacity())).isFalse(); // 应失败
startConsuming.countDown();
awakeProducer.await();
// 消费后再次添加
assertThat(queue.offer(queue.capacity())).isTrue(); // 应成功
});
producer.start();
startConsuming.await();
// 消费队列
Set<Integer> fromQueue = new HashSet<>();
queue.drain(fromQueue::add);
awakeProducer.countDown();
producer.join();
queue.drain(fromQueue::add);
assertThat(fromQueue).containsAll(
IntStream.range(0, 17).boxed().collect(toSet()));
5. 其他数据结构
除队列外,JCTools 还提供:
- **
NonBlockingHashMap
**:无锁ConcurrentHashMap
替代品,扩展性更好,变更成本更低(依赖Unsafe
,Java9+ 不推荐) - **
NonBlockingHashMapLong
**:同上,但使用long
原始类型键 - **
NonBlockingHashSet
**:基于NonBlockingHashMap
的简单封装(类似 JDK 的Collections.newSetFromMap()
) - **
NonBlockingIdentityHashMap
**:同上,但通过对象标识比较键 - **
NonBlockingSetInt
**:基于long
数组的多线程位向量集合(注意避免自动装箱)
6. 性能测试对比
使用 JMH 对比 JDK 的 ArrayBlockingQueue
和 JCTools 队列性能(关键代码片段):
public class MpmcBenchmark {
@Param({PARAM_UNSAFE, PARAM_AFU, PARAM_JDK})
public volatile String implementation;
public volatile Queue<Long> queue;
@Benchmark
@Group(GROUP_NAME)
@GroupThreads(PRODUCER_THREADS_NUMBER)
public void write(Control control) {
while (!control.stopMeasurement && !queue.offer(1L)) {
// 空循环等待
}
}
@Benchmark
@Group(GROUP_NAME)
@GroupThreads(CONSUMER_THREADS_NUMBER)
public void read(Control control) {
while (!control.stopMeasurement && queue.poll() == null) {
// 空循环等待
}
}
}
测试结果(95% 百分位延迟,单位:纳秒/操作):
MpmcArrayQueue: 1052.000 ns/op
MpmcAtomicArrayQueue:1106.000 ns/op
ArrayBlockingQueue: 2364.000 ns/op
✅ 结论:
-
MpmcArrayQueue
比MpmcAtomicArrayQueue
略快 -
ArrayBlockingQueue
性能落后约 2 倍
7. 使用注意事项
JCTools 的主要风险在于无法强制正确使用:
- ⚠️ 在大型项目中,误用
MpscArrayQueue
(要求单消费者)可能导致多个消费者线程读取数据 - ⚠️ 这种错误不会立即报错,但会导致消费者丢失消息,且极难调试
- ⚠️ 目前缺少运行时策略校验机制(如通过系统属性启用测试环境检查)
其他考虑:
- 即使 JCTools 队列更快,但多数应用是 I/O 密集型,线程间数据交换量有限,性能提升可能不明显
- 应仅在高吞吐量线程通信场景下使用
8. 总结
JCTools 提供了高性能的无锁并发数据结构,在重负载下显著优于 JDK 对应实现。但需注意:
✅ 适用场景:线程间需要频繁交换大量对象
❌ 使用前提:必须严格遵守生产者/消费者策略
❌ 不适用场景:I/O 密集型应用或线程通信较少的系统
完整示例代码见 GitHub 仓库。