1. 概述
Java 的 Executor 框架 核心思想是将任务提交与任务执行解耦。这种设计很好地封装了底层执行细节,但在高并发或资源受限场景下,我们仍需精细化控制线程池行为。
本文重点探讨:当线程池无法接受新任务时会发生什么?以及如何通过合理的饱和策略(Saturation Policy)来优雅处理这种“踩坑”场景。
2. 线程池工作原理回顾
下图展示了 Executor 服务的内部工作机制:
当我们向线程池提交一个新任务时,执行流程如下:
- ✅ 若有空闲线程,立即执行任务
- ✅ 否则,将任务放入内部队列等待
- ✅ 线程完成当前任务后,从队列中取出下一个任务执行
2.1. ThreadPoolExecutor 构造器解析
大多数线程池实现都基于 ThreadPoolExecutor
。理解其构造参数是掌握饱和策略的前提:
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler
)
重点关注最后两个参数:workQueue
和 handler
,它们直接决定了任务排队和拒绝行为。
2.2. 核心线程数(corePoolSize)
- 表示线程池应维持的最小线程数量
- 默认情况下,核心线程即使空闲也不会被回收
- 可通过
allowCoreThreadTimeOut(true)
改变此行为
2.3. 最大线程数(maximumPoolSize)
- 当任务队列已满且核心线程全忙时,线程池可临时创建新线程,上限为此值
- 非核心线程在空闲超过
keepAliveTime
后会被回收 - 实际线程数会在
[corePoolSize, maximumPoolSize]
范围内动态调整
2.4. 任务队列策略
队列类型直接影响线程池扩容时机和拒绝概率,主要有三种模式:
无界队列(Unbounded Queue)
- 如
LinkedBlockingQueue
(无容量限制) - 线程池永远不会扩容到最大线程数(因为队列永不饱和)
- ⚠️ 风险:可能导致内存溢出(OOM)
- 如
有界队列(Bounded Queue)
- 如
ArrayBlockingQueue
- 容量有限,满后触发线程扩容或任务拒绝
- 更可控,推荐用于生产环境
- 如
同步移交(Synchronous Handoff)
- 如
SynchronousQueue
- 队列不存储任务,必须有空闲线程才能提交成功
- 类似“直接交接”,
newCachedThreadPool()
使用此模式
- 如
饱和触发条件
当同时满足以下情况时,线程池进入饱和状态:
✅ 所有核心线程忙碌
✅ 队列已满
✅ 线程数已达 maximumPoolSize
且所有线程忙碌
此时再提交任务,就会触发 RejectedExecutionHandler 的处理逻辑。
3. 饱和策略详解
RejectedExecutionHandler
是控制任务拒绝行为的关键。Java 提供了四种内置实现,也可自定义。
3.1. Abort Policy(默认策略)✅
直接抛出 RejectedExecutionException
,简单粗暴。
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.AbortPolicy());
executor.execute(() -> waitFor(250));
assertThatThrownBy(() -> executor.execute(() -> System.out.println("Will be rejected")))
.isInstanceOf(RejectedExecutionException.class);
💡 适用场景:对数据完整性要求高,宁愿失败也不丢任务的系统。
3.2. Caller-Runs Policy(调用者运行)🔄
被拒绝的任务由提交任务的线程(即调用者线程)同步执行。
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.CallerRunsPolicy());
executor.execute(() -> waitFor(250));
long startTime = System.currentTimeMillis();
executor.execute(() -> waitFor(500));
long blockedDuration = System.currentTimeMillis() - startTime;
assertThat(blockedDuration).isGreaterThanOrEqualTo(500);
💡 优势:天然实现“背压”(Backpressure),防止生产者过快压垮消费者。
⚠️ 注意:可能阻塞业务线程,影响整体吞吐。
3.3. Discard Policy(静默丢弃)🗑️
直接丢弃新提交的任务,不抛异常。
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.DiscardPolicy());
executor.execute(() -> waitFor(100));
BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("Discarded Result"));
assertThat(queue.poll(200, MILLISECONDS)).isNull();
💡 适用场景:允许丢失非关键任务的场景,如日志采集、监控上报。
3.4. Discard-Oldest Policy(丢弃最旧任务)⏳
从队列头部移除一个最老的任务,然后重新提交当前任务。
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
new ArrayBlockingQueue<>(2),
new ThreadPoolExecutor.DiscardOldestPolicy());
executor.execute(() -> waitFor(100));
BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("First"));
executor.execute(() -> queue.offer("Second"));
executor.execute(() -> queue.offer("Third"));
waitFor(150);
List<String> results = new ArrayList<>();
queue.drainTo(results);
assertThat(results).containsExactlyInAnyOrder("Second", "Third");
⚠️ 踩坑提示:不要与优先级队列(PriorityQueue)混用!否则可能误删优先级最高的任务。
3.5. 自定义饱和策略 🛠️
实现 RejectedExecutionHandler
接口即可定制逻辑。例如动态扩容:
class GrowPolicy implements RejectedExecutionHandler {
private final Lock lock = new ReentrantLock();
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
lock.lock();
try {
executor.setMaximumPoolSize(executor.getMaximumPoolSize() + 1);
} finally {
lock.unlock();
}
executor.submit(r);
}
}
使用示例:
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
new ArrayBlockingQueue<>(2),
new GrowPolicy());
executor.execute(() -> waitFor(100));
// 提交四个任务,全部执行成功
executor.execute(() -> queue.offer("First"));
executor.execute(() -> queue.offer("Second"));
executor.execute(() -> queue.offer("Third"));
waitFor(150);
List<String> results = new ArrayList<>();
queue.drainTo(results);
assertThat(results).contains("First", "Second", "Third");
💡 注意:动态扩容需谨慎,避免无限增长导致资源耗尽。
3.6. 关闭状态下的任务拒绝 🔐
饱和策略不仅适用于“满载”,也适用于已关闭的线程池:
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>());
executor.shutdownNow(); // 立即关闭
assertThatThrownBy(() -> executor.execute(() -> {}))
.isInstanceOf(RejectedExecutionException.class);
正在关闭中的线程池同样会拒绝新任务:
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>());
executor.execute(() -> waitFor(100));
executor.shutdown(); // 发起关闭
assertThatThrownBy(() -> executor.execute(() -> {}))
.isInstanceOf(RejectedExecutionException.class);
4. 总结
合理选择 RejectedExecutionHandler
是构建高可用线程池的关键一环:
策略 | 优点 | 缺点 | 推荐场景 |
---|---|---|---|
Abort | 安全、明确 | 中断流程 | 核心交易系统 |
Caller-Runs | 实现背压 | 阻塞调用线程 | 生产者-消费者模型 |
Discard | 轻量 | 丢失任务 | 非关键异步任务 |
Discard-Oldest | 保留最新数据 | 可能丢重要任务 | 实时数据流 |
自定义 | 灵活 | 复杂度高 | 特殊业务需求 |
📌 最佳实践:生产环境避免使用无界队列 + AbortPolicy 组合,极易引发 OOM。建议使用有界队列配合 Caller-Runs 或自定义降级策略。
示例代码已托管至 GitHub:https://github.com/eugenp/tutorials/tree/master/core-java-modules/core-java-concurrency-advanced-3