1. 概述

Java 的 Executor 框架 核心思想是将任务提交与任务执行解耦。这种设计很好地封装了底层执行细节,但在高并发或资源受限场景下,我们仍需精细化控制线程池行为。

本文重点探讨:当线程池无法接受新任务时会发生什么?以及如何通过合理的饱和策略(Saturation Policy)来优雅处理这种“踩坑”场景

2. 线程池工作原理回顾

下图展示了 Executor 服务的内部工作机制:

Untitled Diagram res

当我们向线程池提交一个新任务时,执行流程如下:

  1. ✅ 若有空闲线程,立即执行任务
  2. ✅ 否则,将任务放入内部队列等待
  3. ✅ 线程完成当前任务后,从队列中取出下一个任务执行

2.1. ThreadPoolExecutor 构造器解析

大多数线程池实现都基于 ThreadPoolExecutor。理解其构造参数是掌握饱和策略的前提:

public ThreadPoolExecutor(
  int corePoolSize,
  int maximumPoolSize,
  long keepAliveTime,
  TimeUnit unit,
  BlockingQueue<Runnable> workQueue,
  RejectedExecutionHandler handler
)

重点关注最后两个参数:workQueuehandler,它们直接决定了任务排队和拒绝行为。

2.2. 核心线程数(corePoolSize)

  • 表示线程池应维持的最小线程数量
  • 默认情况下,核心线程即使空闲也不会被回收
  • 可通过 allowCoreThreadTimeOut(true) 改变此行为

2.3. 最大线程数(maximumPoolSize)

  • 当任务队列已满且核心线程全忙时,线程池可临时创建新线程,上限为此值
  • 非核心线程在空闲超过 keepAliveTime 后会被回收
  • 实际线程数会在 [corePoolSize, maximumPoolSize] 范围内动态调整

2.4. 任务队列策略

队列类型直接影响线程池扩容时机和拒绝概率,主要有三种模式:

  • 无界队列(Unbounded Queue)

    • LinkedBlockingQueue(无容量限制)
    • 线程池永远不会扩容到最大线程数(因为队列永不饱和)
    • ⚠️ 风险:可能导致内存溢出(OOM)
  • 有界队列(Bounded Queue)

    • ArrayBlockingQueue
    • 容量有限,满后触发线程扩容或任务拒绝
    • 更可控,推荐用于生产环境
  • 同步移交(Synchronous Handoff)

    • SynchronousQueue
    • 队列不存储任务,必须有空闲线程才能提交成功
    • 类似“直接交接”,newCachedThreadPool() 使用此模式

饱和触发条件

当同时满足以下情况时,线程池进入饱和状态:

✅ 所有核心线程忙碌
✅ 队列已满
✅ 线程数已达 maximumPoolSize 且所有线程忙碌

此时再提交任务,就会触发 RejectedExecutionHandler 的处理逻辑。

3. 饱和策略详解

RejectedExecutionHandler 是控制任务拒绝行为的关键。Java 提供了四种内置实现,也可自定义。

3.1. Abort Policy(默认策略)✅

直接抛出 RejectedExecutionException,简单粗暴。

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, 
  new SynchronousQueue<>(), 
  new ThreadPoolExecutor.AbortPolicy());

executor.execute(() -> waitFor(250));

assertThatThrownBy(() -> executor.execute(() -> System.out.println("Will be rejected")))
  .isInstanceOf(RejectedExecutionException.class);

💡 适用场景:对数据完整性要求高,宁愿失败也不丢任务的系统。

3.2. Caller-Runs Policy(调用者运行)🔄

被拒绝的任务由提交任务的线程(即调用者线程)同步执行

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, 
  new SynchronousQueue<>(), 
  new ThreadPoolExecutor.CallerRunsPolicy());

executor.execute(() -> waitFor(250));

long startTime = System.currentTimeMillis();
executor.execute(() -> waitFor(500));
long blockedDuration = System.currentTimeMillis() - startTime;

assertThat(blockedDuration).isGreaterThanOrEqualTo(500);

💡 优势:天然实现“背压”(Backpressure),防止生产者过快压垮消费者。
⚠️ 注意:可能阻塞业务线程,影响整体吞吐。

3.3. Discard Policy(静默丢弃)🗑️

直接丢弃新提交的任务,不抛异常。

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
  new SynchronousQueue<>(), 
  new ThreadPoolExecutor.DiscardPolicy());

executor.execute(() -> waitFor(100));

BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("Discarded Result"));

assertThat(queue.poll(200, MILLISECONDS)).isNull();

💡 适用场景:允许丢失非关键任务的场景,如日志采集、监控上报。

3.4. Discard-Oldest Policy(丢弃最旧任务)⏳

从队列头部移除一个最老的任务,然后重新提交当前任务。

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, 
  new ArrayBlockingQueue<>(2), 
  new ThreadPoolExecutor.DiscardOldestPolicy());

executor.execute(() -> waitFor(100));

BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("First"));
executor.execute(() -> queue.offer("Second"));
executor.execute(() -> queue.offer("Third"));
waitFor(150);

List<String> results = new ArrayList<>();
queue.drainTo(results);

assertThat(results).containsExactlyInAnyOrder("Second", "Third");

⚠️ 踩坑提示:不要与优先级队列(PriorityQueue)混用!否则可能误删优先级最高的任务。

3.5. 自定义饱和策略 🛠️

实现 RejectedExecutionHandler 接口即可定制逻辑。例如动态扩容:

class GrowPolicy implements RejectedExecutionHandler {

    private final Lock lock = new ReentrantLock();

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        lock.lock();
        try {
            executor.setMaximumPoolSize(executor.getMaximumPoolSize() + 1);
        } finally {
            lock.unlock();
        }

        executor.submit(r);
    }
}

使用示例:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, 
  new ArrayBlockingQueue<>(2), 
  new GrowPolicy());

executor.execute(() -> waitFor(100));
// 提交四个任务,全部执行成功
executor.execute(() -> queue.offer("First"));
executor.execute(() -> queue.offer("Second"));
executor.execute(() -> queue.offer("Third"));
waitFor(150);

List<String> results = new ArrayList<>();
queue.drainTo(results);
assertThat(results).contains("First", "Second", "Third");

💡 注意:动态扩容需谨慎,避免无限增长导致资源耗尽。

3.6. 关闭状态下的任务拒绝 🔐

饱和策略不仅适用于“满载”,也适用于已关闭的线程池

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>());
executor.shutdownNow(); // 立即关闭

assertThatThrownBy(() -> executor.execute(() -> {}))
  .isInstanceOf(RejectedExecutionException.class);

正在关闭中的线程池同样会拒绝新任务:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>());
executor.execute(() -> waitFor(100));
executor.shutdown(); // 发起关闭

assertThatThrownBy(() -> executor.execute(() -> {}))
  .isInstanceOf(RejectedExecutionException.class);

4. 总结

合理选择 RejectedExecutionHandler 是构建高可用线程池的关键一环:

策略 优点 缺点 推荐场景
Abort 安全、明确 中断流程 核心交易系统
Caller-Runs 实现背压 阻塞调用线程 生产者-消费者模型
Discard 轻量 丢失任务 非关键异步任务
Discard-Oldest 保留最新数据 可能丢重要任务 实时数据流
自定义 灵活 复杂度高 特殊业务需求

📌 最佳实践:生产环境避免使用无界队列 + AbortPolicy 组合,极易引发 OOM。建议使用有界队列配合 Caller-Runs 或自定义降级策略。

示例代码已托管至 GitHub:https://github.com/eugenp/tutorials/tree/master/core-java-modules/core-java-concurrency-advanced-3


原始标题:Guide to RejectedExecutionHandler