1. 概述

本文深入探讨 Java 中的 Work Stealing(工作窃取)机制,这是提升多线程应用性能的关键技术之一。通过 Fork/Join 框架实现的任务调度策略,能有效降低线程竞争,提升 CPU 利用率。

对于并发编程老手来说,理解底层调度逻辑远比只会用 ExecutorService 更有价值。本文将带你从原理到实战,彻底搞懂 Work Stealing 的运作方式。


2. 什么是 Work Stealing?

Work Stealing 的核心目标是:✅ 减少多线程环境下的任务竞争,提升并行处理效率。它在 Java 中通过 Fork/Join 框架 实现,特别适合“分而治之”类的计算密集型任务。

2.1 分治策略(Divide and Conquer)

Fork/Join 的本质是递归地将大任务拆解为小任务,各自执行后再合并结果。典型的伪代码结构如下:

Result solve(Problem problem) {
    if (problem is small)
        directly solve problem
    else {
        split problem into independent parts
        fork new subtasks to solve each part
        join all subtasks
        compose result from subresults
    }
}

这种模式天然适合并行化——每个子任务独立,可由不同线程处理。

2.2 工作线程与双端队列

任务拆解后,由线程池中的 worker thread(工作线程) 执行。每个线程维护一个 双端队列(deque),用于存放自己的任务。

关键行为如下:

  • 本地执行:线程优先从自己 deque 的头部(top)后进先出(LIFO) 地取任务执行,这有利于缓存局部性。
  • 工作窃取:当自身队列为空时,线程会随机选择一个“富裕”线程(victim),从其 deque 的尾部(tail)先进先出(FIFO) 窃取任务。

⚠️ 这种“头出尾偷”的设计是 Work Stealing 的精髓——窃取者和所有者操作 deque 的不同端,极大减少了锁竞争


3. Fork/Join 框架实现

Java 提供了两种方式创建支持 Work Stealing 的线程池:

// 方式一:直接使用 ForkJoinPool 的公共池
ForkJoinPool commonPool = ForkJoinPool.commonPool();

// 方式二:通过 Executors 工具类创建
ExecutorService workStealingPool = Executors.newWorkStealingPool();

Executors.newWorkStealingPool() 实际是对 ForkJoinPool 的封装,但它默认创建的是 异步模式(async mode) 的池,而 commonPool 默认是同步模式。

你也可以指定并行度:

// 指定并行级别
ExecutorService customPool = Executors.newWorkStealingPool(4);

4. 同步 vs 异步线程池

两者的核心区别在于任务调度队列的模式:

线程池类型 队列模式 特点
ForkJoinPool.commonPool() LIFO(后进先出) 本地任务优先执行,适合需要 join() 等待结果的场景
Executors.newWorkStealingPool() FIFO(先进先出) 更利于工作窃取,适合事件驱动型任务

根据 Doug Lea 的论文,FIFO 模式在 Work Stealing 上有两大优势:

  1. 降低竞争:窃取者从尾部拿,所有者从头部拿,互不干扰。
  2. 利于任务再分解:早期生成的大任务更容易被窃取后进一步拆分,提升并行度。

📌 官方文档建议:asyncMode=true 适用于 从不调用 join() 的事件风格任务(event-style tasks)。


5. 实战示例:查找质数

我们通过一个计算密集型任务——在指定范围内查找质数,来对比不同线程池的性能差异。

5.1 质数查找任务实现

public class PrimeNumbers extends RecursiveAction {

    private int lowerBound;
    private int upperBound;
    private int granularity;
    static final List<Integer> GRANULARITIES
      = Arrays.asList(1, 10, 100, 1000, 10000);
    private AtomicInteger noOfPrimeNumbers;

    PrimeNumbers(int lowerBound, int upperBound, int granularity, AtomicInteger noOfPrimeNumbers) {
        this.lowerBound = lowerBound;
        this.upperBound = upperBound;
        this.granularity = granularity;
        this.noOfPrimeNumbers = noOfPrimeNumbers;
    }

    private List<PrimeNumbers> subTasks() {
        List<PrimeNumbers> subTasks = new ArrayList<>();

        for (int i = 1; i <= this.upperBound / granularity; i++) {
            int upper = i * granularity;
            int lower = (upper - granularity) + 1;
            subTasks.add(new PrimeNumbers(lower, upper, noOfPrimeNumbers));
        }
        return subTasks;
    }

    @Override
    protected void compute() {
        if (((upperBound + 1) - lowerBound) > granularity) {
            ForkJoinTask.invokeAll(subTasks());
        } else {
            findPrimeNumbers();
        }
    }

    void findPrimeNumbers() {
        for (int num = lowerBound; num <= upperBound; num++) {
            if (isPrime(num)) {
                noOfPrimeNumbers.getAndIncrement();
            }
        }
    }

    public int noOfPrimeNumbers() {
        return noOfPrimeNumbers.intValue();
    }
}

关键点说明:

  • ✅ 继承 RecursiveAction,实现 compute() 方法,支持 fork/join 调度。
  • granularity 控制任务拆分粒度,越小拆分越细。
  • ✅ 使用 AtomicInteger 线程安全地统计质数个数。

5.2 性能对比:单线程 vs Work Stealing

我们对比三种执行方式:

单线程执行

PrimeNumbers primes = new PrimeNumbers(1, 10000, 100, new AtomicInteger());
primes.findPrimeNumbers();

使用 ForkJoinPool.commonPool

PrimeNumbers primes = new PrimeNumbers(1, 10000, 100, new AtomicInteger());
ForkJoinPool pool = ForkJoinPool.commonPool();
pool.invoke(primes);
pool.shutdown();

使用 Executors.newWorkStealingPool

PrimeNumbers primes = new PrimeNumbers(1, 10000, 100, new AtomicInteger());
int parallelism = ForkJoinPool.getCommonPoolParallelism();
ForkJoinPool stealer = (ForkJoinPool) Executors.newWorkStealingPool(parallelism);
stealer.invoke(primes);
stealer.shutdown();

使用 JMH(Java Microbenchmark Harness)测试结果如下:

# Run complete. Total time: 00:04:50

Benchmark                                                      Mode  Cnt    Score   Error  Units
PrimeNumbersUnitTest.Benchmarker.commonPoolBenchmark           avgt   20  119.885 ± 9.917  ms/op
PrimeNumbersUnitTest.Benchmarker.newWorkStealingPoolBenchmark  avgt   20  119.791 ± 7.811  ms/op
PrimeNumbersUnitTest.Benchmarker.singleThread                  avgt   20  475.964 ± 7.929  ms/op

✅ 明显看出:两种 Work Stealing 方式性能接近,比单线程快约 4 倍,充分发挥了多核优势。

5.3 工作窃取行为分析

通过 ForkJoinPool.getStealCount() 可以观察实际发生的工作窃取次数:

Executors.newWorkStealingPool(异步模式)

Granularity: [1], Steals: [6564]
Granularity: [10], Steals: [572]
Granularity: [100], Steals: [56]
Granularity: [1000], Steals: [60]
Granularity: [10000], Steals: [1]

ForkJoinPool.commonPool(同步模式)

Granularity: [1], Steals: [6923]
Granularity: [10], Steals: [7540]
Granularity: [100], Steals: [7605]
Granularity: [1000], Steals: [7681]
Granularity: [10000], Steals: [7681]

关键观察:

  • newWorkStealingPool窃取次数随粒度变粗显著下降。任务不分拆时(granularity=10000),几乎无窃取行为。
  • commonPool无论粒度如何,窃取次数都维持高位,说明其调度策略更激进,即使任务已足够大也频繁尝试窃取。

📌 结论:对于无需 join 的独立计算任务(如本例),Executors.newWorkStealingPool 资源利用率更高,行为更可控,是更优选择。


6. 总结

本文系统解析了 Java 中的 Work Stealing 机制,核心要点如下:

  • ✅ Work Stealing 通过 双端队列 + 头出尾偷 策略,有效降低线程竞争。
  • Fork/Join 框架是其实现基础,适合 分治类计算密集型任务
  • Executors.newWorkStealingPool 使用 FIFO 模式,更适合 事件驱动、无需 join 的场景
  • ForkJoinPool.commonPool 使用 LIFO,适合需要 join 的递归任务。
  • ✅ 实际选型需结合任务特性:粒度、是否需要结果合并、是否为事件风格

示例代码已托管至 GitHub:https://github.com/baeldung/core-java-modules/tree/master/core-java-concurrency-advanced-3


原始标题:Guide to Work Stealing in Java | Baeldung