1. 概述

本文将深入介绍 Java 中几种核心的并发队列实现。如果你对队列的基础概念还不熟悉,可以先阅读我们的 Java Queue 接口指南

并发队列在多线程编程中扮演着至关重要的角色,选对队列类型,往往能直接决定系统的吞吐和稳定性。我们不仅会对比阻塞与非阻塞队列的差异,还会结合实际场景分析每种实现的适用性。

2. 阻塞队列 vs 非阻塞队列

BlockingQueue 提供了一种简单且线程安全的同步机制。它的核心特点是:当队列满时,生产者线程会被阻塞;当队列空时,消费者线程也会被阻塞,直到有新元素可用。

相比之下,非阻塞队列(如 ConcurrentLinkedQueue)在操作失败时不会阻塞线程,而是立即返回特殊值(如 nullfalse),或者抛出异常。

为了实现阻塞语义,BlockingQueue 在标准 Queue 接口的基础上扩展了两个关键方法:

  • put(E e):阻塞式入队,队列满时等待
  • take():阻塞式出队,队列空时等待

这两个方法是构建高可靠生产者-消费者模型的基石。

3. 并发队列实现类详解

3.1. ArrayBlockingQueue

顾名思义,底层使用数组存储元素,因此是一个有界队列(必须指定容量)。

适用场景

  • ✅ 工作任务分发系统(如线程池任务队列)
  • ✅ 内存敏感场景,容量上限可防止 OOM

核心特性

  • ⚠️ 使用单锁(ReentrantLock)控制 puttake,意味着生产和消费不能完全并行,存在竞争
  • ✅ 预分配数组内存,减少运行时开销,但可能导致内存浪费(例如大容量队列长期空闲)
  • ❌ 不支持 null 元素
// 创建一个容量为10的有界队列
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

💡 踩坑提醒:由于是单锁设计,在高并发场景下可能成为性能瓶颈,注意监控锁竞争。

3.2. LinkedBlockingQueue

基于链表结构实现,节点动态创建和回收。

容量控制

  • 默认容量为 Integer.MAX_VALUE(≈ 21亿),接近“无界”
  • 也可通过构造函数指定上限:new LinkedBlockingQueue<>(1000)

核心优势

  • ✅ 使用双锁分离putLocktakeLock,生产与消费可并行执行,显著提升吞吐
  • ✅ 更适合元素频繁增删的场景
// 指定容量的链表队列
BlockingQueue<Task> taskQueue = new LinkedBlockingQueue<>(1024);

// 无界模式(慎用!)
BlockingQueue<String> unbounded = new LinkedBlockingQueue<>();

性能对比 ArrayBlockingQueue

维度 ArrayBlockingQueue LinkedBlockingQueue
内存 预分配,可能浪费 动态分配,更灵活
吞吐 单锁,较低 双锁,并发高
GC 压力 高(频繁创建对象)

💡 简单粗暴选型建议:如果队列大小可控且追求极致性能,用 ArrayBlockingQueue;否则优先考虑 LinkedBlockingQueue

3.3. PriorityBlockingQueue

当需要按优先级处理任务时,它是首选。

实现原理

  • 底层是基于数组的二叉堆(最小堆)
  • 入队 put()O(log n),出队 take() 也是 O(log n)
  • 虽然内部使用单锁,但通过自旋优化,puttake 可部分并发执行
// 任务按优先级排序
BlockingQueue<Task> priorityQueue = new PriorityBlockingQueue<>(
    11, 
    Comparator.comparing(Task::getPriority).reversed()
);

典型用例

  • ✅ 定时任务调度
  • ✅ 高优先级消息优先处理
  • ✅ 算法中的优先队列(如 Dijkstra)

⚠️ 注意:元素必须实现 Comparable,或提供 Comparator,否则 put 会抛出 ClassCastException

3.4. DelayQueue

一种特殊的无界阻塞队列,只有当元素的延迟时间到期后,才能被消费

核心接口

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

使用示例:延迟任务执行

public class DelayedTask implements Delayed {
    private final long startTime;
    private final Runnable task;

    public DelayedTask(long delayInMillis, Runnable task) {
        this.startTime = System.currentTimeMillis() + delayInMillis;
        this.task = task;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(startTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return Long.compare(this.startTime, ((DelayedTask) o).startTime);
    }
}

// 使用 DelayQueue 实现定时器
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
delayQueue.put(new DelayedTask(5000, () -> System.out.println("5秒后执行")));

常见用途

  • ✅ 缓存过期清理
  • ✅ 重试机制中的退避策略
  • ✅ 模拟事件循环(类似 Node.js)

3.5. LinkedTransferQueue

它引入了独特的 transfer() 方法,实现了生产者等待消费者接收的“手递手”语义。

关键方法

  • transfer(E e):生产者阻塞,直到有消费者 take()poll() 该元素
  • tryTransfer(E e, long timeout, TimeUnit unit):带超时的 transfer
LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();

// 场景:确保消息被消费
new Thread(() -> {
    try {
        System.out.println("生产者等待消费者...");
        queue.transfer("必须被消费的消息");
        System.out.println("消息已被消费,生产者继续");
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}).start();

Thread.sleep(2000);

new Thread(() -> {
    String msg = queue.take();
    System.out.println("消费者收到: " + msg);
}).start();

优势

  • ✅ 可实现背压(Backpressure):消费者控制生产速率
  • ✅ 消息传递的强保证,避免消息堆积

3.6. SynchronousQueue

一个容量为0的特殊队列,不存储元素,仅用于线程间直接传递数据。

特性

  • put() 必须等到 take() 被调用才能返回
  • ✅ 相当于一个“握手”通道,实现线程同步
  • Executors.newCachedThreadPool() 默认使用它
SynchronousQueue<String> syncQueue = new SynchronousQueue<>();

new Thread(() -> {
    try {
        System.out.println("等待传递...");
        syncQueue.put("Hello");
        System.out.println("传递完成");
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}).start();

Thread.sleep(1000);

new Thread(() -> {
    try {
        String msg = syncQueue.take();
        System.out.println("接收到: " + msg);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}).start();

💡 适合高并发短任务场景,避免中间缓冲,但使用不当易导致线程饥饿。

3.7. ConcurrentLinkedQueue

这是本文介绍的唯一非阻塞队列,基于 CAS(Compare-And-Swap)实现无锁并发。

核心特点

  • add()poll() 操作立即返回,无阻塞
  • ✅ 使用 Michael & Scott 的无锁队列算法,高性能
  • ✅ 适合高并发读写场景
ConcurrentLinkedQueue<String> clq = new ConcurrentLinkedQueue<>();
clq.add("item1");
clq.offer("item2");

String item = clq.poll(); // 空则返回 null

适用场景

  • ✅ 响应式编程(Reactive Streams)
  • ✅ 日志缓冲区
  • ✅ 高频计数器

⚠️ 注意:没有阻塞等待机制。如果消费者轮询空队列,会浪费 CPU,此时应改用阻塞队列。

4. 总结

队列类型 是否阻塞 是否有界 锁机制 典型用途
ArrayBlockingQueue 单锁 固定大小任务队列
LinkedBlockingQueue ⚠️ 双锁 通用生产者-消费者
PriorityBlockingQueue 单锁+自旋 优先级任务调度
DelayQueue 内部堆 延迟任务执行
LinkedTransferQueue 无锁+阻塞 消息传递保证
SynchronousQueue 0 手递手 线程直接通信
ConcurrentLinkedQueue CAS 无锁 高并发非阻塞场景

选择建议:

  • 要阻塞同步?BlockingQueue 族中选
  • 要优先级?PriorityBlockingQueue
  • 要延迟消费?DelayQueue
  • 要生产者等待?LinkedTransferQueue
  • 要极致性能无阻塞?ConcurrentLinkedQueue

最终决策前务必结合实际场景进行压测,避免纸上谈兵。毕竟,没有最好的队列,只有最合适的队列


原始标题:A Guide to Concurrent Queues in Java