1. 概述
本文将深入介绍 Java 中几种核心的并发队列实现。如果你对队列的基础概念还不熟悉,可以先阅读我们的 Java Queue 接口指南。
并发队列在多线程编程中扮演着至关重要的角色,选对队列类型,往往能直接决定系统的吞吐和稳定性。我们不仅会对比阻塞与非阻塞队列的差异,还会结合实际场景分析每种实现的适用性。
2. 阻塞队列 vs 非阻塞队列
BlockingQueue
提供了一种简单且线程安全的同步机制。它的核心特点是:当队列满时,生产者线程会被阻塞;当队列空时,消费者线程也会被阻塞,直到有新元素可用。
相比之下,非阻塞队列(如 ConcurrentLinkedQueue
)在操作失败时不会阻塞线程,而是立即返回特殊值(如 null
或 false
),或者抛出异常。
为了实现阻塞语义,BlockingQueue
在标准 Queue
接口的基础上扩展了两个关键方法:
- ✅
put(E e)
:阻塞式入队,队列满时等待 - ✅
take()
:阻塞式出队,队列空时等待
这两个方法是构建高可靠生产者-消费者模型的基石。
3. 并发队列实现类详解
3.1. ArrayBlockingQueue
顾名思义,底层使用数组存储元素,因此是一个有界队列(必须指定容量)。
适用场景
- ✅ 工作任务分发系统(如线程池任务队列)
- ✅ 内存敏感场景,容量上限可防止 OOM
核心特性
- ⚠️ 使用单锁(ReentrantLock)控制
put
和take
,意味着生产和消费不能完全并行,存在竞争 - ✅ 预分配数组内存,减少运行时开销,但可能导致内存浪费(例如大容量队列长期空闲)
- ❌ 不支持
null
元素
// 创建一个容量为10的有界队列
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
💡 踩坑提醒:由于是单锁设计,在高并发场景下可能成为性能瓶颈,注意监控锁竞争。
3.2. LinkedBlockingQueue
基于链表结构实现,节点动态创建和回收。
容量控制
- 默认容量为
Integer.MAX_VALUE
(≈ 21亿),接近“无界” - 也可通过构造函数指定上限:
new LinkedBlockingQueue<>(1000)
核心优势
- ✅ 使用双锁分离:
putLock
和takeLock
,生产与消费可并行执行,显著提升吞吐 - ✅ 更适合元素频繁增删的场景
// 指定容量的链表队列
BlockingQueue<Task> taskQueue = new LinkedBlockingQueue<>(1024);
// 无界模式(慎用!)
BlockingQueue<String> unbounded = new LinkedBlockingQueue<>();
性能对比 ArrayBlockingQueue
维度 | ArrayBlockingQueue |
LinkedBlockingQueue |
---|---|---|
内存 | 预分配,可能浪费 | 动态分配,更灵活 |
吞吐 | 单锁,较低 | 双锁,并发高 |
GC 压力 | 低 | 高(频繁创建对象) |
💡 简单粗暴选型建议:如果队列大小可控且追求极致性能,用
ArrayBlockingQueue
;否则优先考虑LinkedBlockingQueue
。
3.3. PriorityBlockingQueue
当需要按优先级处理任务时,它是首选。
实现原理
- 底层是基于数组的二叉堆(最小堆)
- 入队
put()
是O(log n)
,出队take()
也是O(log n)
- 虽然内部使用单锁,但通过自旋优化,
put
和take
可部分并发执行
// 任务按优先级排序
BlockingQueue<Task> priorityQueue = new PriorityBlockingQueue<>(
11,
Comparator.comparing(Task::getPriority).reversed()
);
典型用例
- ✅ 定时任务调度
- ✅ 高优先级消息优先处理
- ✅ 算法中的优先队列(如 Dijkstra)
⚠️ 注意:元素必须实现 Comparable
,或提供 Comparator
,否则 put
会抛出 ClassCastException
。
3.4. DelayQueue
一种特殊的无界阻塞队列,只有当元素的延迟时间到期后,才能被消费。
核心接口
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
使用示例:延迟任务执行
public class DelayedTask implements Delayed {
private final long startTime;
private final Runnable task;
public DelayedTask(long delayInMillis, Runnable task) {
this.startTime = System.currentTimeMillis() + delayInMillis;
this.task = task;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(startTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.startTime, ((DelayedTask) o).startTime);
}
}
// 使用 DelayQueue 实现定时器
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
delayQueue.put(new DelayedTask(5000, () -> System.out.println("5秒后执行")));
常见用途
- ✅ 缓存过期清理
- ✅ 重试机制中的退避策略
- ✅ 模拟事件循环(类似 Node.js)
3.5. LinkedTransferQueue
它引入了独特的 transfer()
方法,实现了生产者等待消费者接收的“手递手”语义。
关键方法
- ✅
transfer(E e)
:生产者阻塞,直到有消费者take()
或poll()
该元素 - ✅
tryTransfer(E e, long timeout, TimeUnit unit)
:带超时的 transfer
LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();
// 场景:确保消息被消费
new Thread(() -> {
try {
System.out.println("生产者等待消费者...");
queue.transfer("必须被消费的消息");
System.out.println("消息已被消费,生产者继续");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
Thread.sleep(2000);
new Thread(() -> {
String msg = queue.take();
System.out.println("消费者收到: " + msg);
}).start();
优势
- ✅ 可实现背压(Backpressure):消费者控制生产速率
- ✅ 消息传递的强保证,避免消息堆积
3.6. SynchronousQueue
一个容量为0的特殊队列,不存储元素,仅用于线程间直接传递数据。
特性
- ✅
put()
必须等到take()
被调用才能返回 - ✅ 相当于一个“握手”通道,实现线程同步
- ✅
Executors.newCachedThreadPool()
默认使用它
SynchronousQueue<String> syncQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println("等待传递...");
syncQueue.put("Hello");
System.out.println("传递完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
Thread.sleep(1000);
new Thread(() -> {
try {
String msg = syncQueue.take();
System.out.println("接收到: " + msg);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
💡 适合高并发短任务场景,避免中间缓冲,但使用不当易导致线程饥饿。
3.7. ConcurrentLinkedQueue
这是本文介绍的唯一非阻塞队列,基于 CAS(Compare-And-Swap)实现无锁并发。
核心特点
- ✅
add()
和poll()
操作立即返回,无阻塞 - ✅ 使用 Michael & Scott 的无锁队列算法,高性能
- ✅ 适合高并发读写场景
ConcurrentLinkedQueue<String> clq = new ConcurrentLinkedQueue<>();
clq.add("item1");
clq.offer("item2");
String item = clq.poll(); // 空则返回 null
适用场景
- ✅ 响应式编程(Reactive Streams)
- ✅ 日志缓冲区
- ✅ 高频计数器
⚠️ 注意:没有阻塞等待机制。如果消费者轮询空队列,会浪费 CPU,此时应改用阻塞队列。
4. 总结
队列类型 | 是否阻塞 | 是否有界 | 锁机制 | 典型用途 |
---|---|---|---|---|
ArrayBlockingQueue |
✅ | ✅ | 单锁 | 固定大小任务队列 |
LinkedBlockingQueue |
✅ | ⚠️ | 双锁 | 通用生产者-消费者 |
PriorityBlockingQueue |
✅ | ❌ | 单锁+自旋 | 优先级任务调度 |
DelayQueue |
✅ | ❌ | 内部堆 | 延迟任务执行 |
LinkedTransferQueue |
✅ | ❌ | 无锁+阻塞 | 消息传递保证 |
SynchronousQueue |
✅ | 0 | 手递手 | 线程直接通信 |
ConcurrentLinkedQueue |
❌ | ❌ | CAS 无锁 | 高并发非阻塞场景 |
选择建议:
- ✅ 要阻塞同步? 从
BlockingQueue
族中选 - ✅ 要优先级? 用
PriorityBlockingQueue
- ✅ 要延迟消费? 用
DelayQueue
- ✅ 要生产者等待? 用
LinkedTransferQueue
- ✅ 要极致性能无阻塞? 用
ConcurrentLinkedQueue
最终决策前务必结合实际场景进行压测,避免纸上谈兵。毕竟,没有最好的队列,只有最合适的队列。