1. 概述

本文将介绍 JCTools(Java Concurrency Tools)库。简单来说,它提供了一系列专为多线程环境设计的实用数据结构,能帮我们更高效地处理并发问题。

2. 非阻塞算法

传统多线程代码操作共享可变状态时,通常依赖来保证数据一致性和可见性。但这种方式存在明显缺陷:

  • ❌ 线程可能因获取锁而阻塞,导致并行度下降
  • ❌ 锁竞争越激烈,JVM 调度开销越大,实际工作效率越低
  • ❌ 多锁场景下可能出现死锁
  • ❌ 存在优先级反转风险(高优先级线程被低优先级线程持有的锁阻塞)
  • ❌ 粗粒度锁会严重限制并行性,而细粒度锁设计复杂且易出错

非阻塞算法是更好的替代方案:任何线程的失败或暂停不会影响其他线程。这类算法分为两类:

  • 无锁(Lock-Free):保证至少有一个线程能在有限时间内取得进展(不会死锁)
  • 无等待(Wait-Free):保证每个线程都能在有限时间内取得进展

下面是一个经典的非阻塞栈实现(出自《Java Concurrency in Practice》):

public class ConcurrentStack<E> {
    AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();

    private static class Node <E> {
        public E item;
        public Node<E> next;
        // 标准构造器
    }
}

核心操作方法:

public void push(E item){
    Node<E> newHead = new Node<E>(item);
    Node<E> oldHead;
    
    do {
        oldHead = top.get();
        newHead.next = oldHead;
    } while(!top.compareAndSet(oldHead, newHead));
}

public E pop() {
    Node<E> oldHead;
    Node<E> newHead;
    do {
        oldHead = top.get();
        if (oldHead == null) {
            return null;
        }
        newHead = oldHead.next;
    } while (!top.compareAndSet(oldHead, newHead));
    
    return oldHead.item;
}

这个实现通过细粒度的 CAS(Compare-And-Swap) 操作实现无锁特性(即使多线程同时调用 top.compareAndSet(),也必然有一个成功),但不是无等待的(无法保证特定线程的 CAS 操作最终成功)。

3. 依赖配置

pom.xml 中添加 JCTools 依赖:

<dependency>
    <groupId>org.jctools</groupId>
    <artifactId>jctools-core</artifactId>
    <version>2.1.2</version>
</dependency>

最新版本可在 Maven Central 查询。

4. JCTools 队列详解

JCTools 提供了多种线程安全的无锁队列,核心接口是 org.jctools.queues.MessagePassingQueue

4.1. 队列类型分类

根据生产者/消费者策略分为四类:

类型 前缀 示例 适用场景
单生产者/单消费者 Spsc SpscArrayQueue 明确的生产消费配对
单生产者/多消费者 Spmc SpmcArrayQueue 多个消费者处理同一生产源
多生产者/单消费者 Mpsc MpscArrayQueue 多个生产者写入单一消费者
多生产者/多消费者 Mpmc MpmcArrayQueue 完全自由的生产消费模式

⚠️ 踩坑警告:队列内部不检查策略合规性!错误使用可能导致数据丢失。例如:

SpscArrayQueue<Integer> queue = new SpscArrayQueue<>(2);

Thread producer1 = new Thread(() -> queue.offer(1));
producer1.start();
producer1.join();

Thread producer2 = new Thread(() -> queue.offer(2));
producer2.start();
producer2.join();

Set<Integer> fromQueue = new HashSet<>();
Thread consumer = new Thread(() -> queue.drain(fromQueue::add));
consumer.start();
consumer.join();

assertThat(fromQueue).containsOnly(1, 2); // 测试通过但实际不安全!

4.2. 队列实现清单

所有队列实现汇总:

  • **SpscArrayQueue**:单生产者/单消费者,数组实现,有界容量
  • **SpscLinkedQueue**:单生产者/单消费者,链表实现,无界容量
  • **SpscChunkedArrayQueue**:单生产者/单消费者,初始容量可动态增长至最大容量
  • **SpscGrowableArrayQueue**:同上,但内部块管理不同(推荐用 SpscChunkedArrayQueue
  • **SpscUnboundedArrayQueue**:单生产者/单消费者,数组实现,无界容量
  • **SpmcArrayQueue**:单生产者/多消费者,数组实现,有界容量
  • **MpscArrayQueue**:多生产者/单消费者,数组实现,有界容量
  • **MpscLinkedQueue**:多生产者/单消费者,链表实现,无界容量
  • **MpmcArrayQueue**:多生产者/多消费者,数组实现,有界容量

4.3. Atomic 队列变体

前述队列均依赖 sun.misc.Unsafe,但在 Java 9+(JEP-260)中默认不可用。因此提供了基于 java.util.concurrent.atomic.AtomicLongFieldUpdater 的替代方案:

  • 命名规则:在原队列名中插入 Atomic(如 SpscChunkedAtomicArrayQueue
  • 性能略低,但兼容性更好
  • 推荐优先使用常规队列,仅在 HotSpot Java9+ 或 JRockit 环境中使用 Atomic 变体

4.4. 容量管理

JCTools 队列分为有界和无界两类。有界队列满时会拒绝新元素,典型使用模式:

SpscChunkedArrayQueue<Integer> queue = new SpscChunkedArrayQueue<>(8, 16);
CountDownLatch startConsuming = new CountDownLatch(1);
CountDownLatch awakeProducer = new CountDownLatch(1);

Thread producer = new Thread(() -> {
    // 填满队列
    IntStream.range(0, queue.capacity()).forEach(i -> {
        assertThat(queue.offer(i)).isTrue();
    });
    // 尝试添加超出容量的元素
    assertThat(queue.offer(queue.capacity())).isFalse(); // 应失败
    
    startConsuming.countDown();
    awakeProducer.await();
    // 消费后再次添加
    assertThat(queue.offer(queue.capacity())).isTrue(); // 应成功
});

producer.start();
startConsuming.await();

// 消费队列
Set<Integer> fromQueue = new HashSet<>();
queue.drain(fromQueue::add);
awakeProducer.countDown();
producer.join();
queue.drain(fromQueue::add);

assertThat(fromQueue).containsAll(
  IntStream.range(0, 17).boxed().collect(toSet()));

5. 其他数据结构

除队列外,JCTools 还提供:

  • **NonBlockingHashMap**:无锁 ConcurrentHashMap 替代品,扩展性更好,变更成本更低(依赖 Unsafe,Java9+ 不推荐)
  • **NonBlockingHashMapLong**:同上,但使用 long 原始类型键
  • **NonBlockingHashSet**:基于 NonBlockingHashMap 的简单封装(类似 JDK 的 Collections.newSetFromMap()
  • **NonBlockingIdentityHashMap**:同上,但通过对象标识比较键
  • **NonBlockingSetInt**:基于 long 数组的多线程位向量集合(注意避免自动装箱)

6. 性能测试对比

使用 JMH 对比 JDK 的 ArrayBlockingQueue 和 JCTools 队列性能(关键代码片段):

public class MpmcBenchmark {
    @Param({PARAM_UNSAFE, PARAM_AFU, PARAM_JDK})
    public volatile String implementation;

    public volatile Queue<Long> queue;

    @Benchmark
    @Group(GROUP_NAME)
    @GroupThreads(PRODUCER_THREADS_NUMBER)
    public void write(Control control) {
        while (!control.stopMeasurement && !queue.offer(1L)) {
            // 空循环等待
        }
    }

    @Benchmark
    @Group(GROUP_NAME)
    @GroupThreads(CONSUMER_THREADS_NUMBER)
    public void read(Control control) {
        while (!control.stopMeasurement && queue.poll() == null) {
            // 空循环等待
        }
    }
}

测试结果(95% 百分位延迟,单位:纳秒/操作):

MpmcArrayQueue:      1052.000 ns/op
MpmcAtomicArrayQueue:1106.000 ns/op
ArrayBlockingQueue:  2364.000 ns/op

结论

  • MpmcArrayQueueMpmcAtomicArrayQueue 略快
  • ArrayBlockingQueue 性能落后约 2 倍

7. 使用注意事项

JCTools 的主要风险在于无法强制正确使用

  • ⚠️ 在大型项目中,误用 MpscArrayQueue(要求单消费者)可能导致多个消费者线程读取数据
  • ⚠️ 这种错误不会立即报错,但会导致消费者丢失消息,且极难调试
  • ⚠️ 目前缺少运行时策略校验机制(如通过系统属性启用测试环境检查)

其他考虑:

  • 即使 JCTools 队列更快,但多数应用是 I/O 密集型,线程间数据交换量有限,性能提升可能不明显
  • 应仅在高吞吐量线程通信场景下使用

8. 总结

JCTools 提供了高性能的无锁并发数据结构,在重负载下显著优于 JDK 对应实现。但需注意:

适用场景:线程间需要频繁交换大量对象
使用前提:必须严格遵守生产者/消费者策略
不适用场景:I/O 密集型应用或线程通信较少的系统

完整示例代码见 GitHub 仓库


原始标题:Java Concurrency Utility with JCTools