1. 概述

本文深入探讨Java中的线程池机制。首先介绍标准Java库中的不同实现,随后分析Google Guava库中的扩展方案。

2. 线程池核心概念

在Java中,线程与操作系统级线程直接映射,属于系统资源。若不受控地创建线程,会快速耗尽系统资源。

操作系统通过上下文切换模拟线程并行——简单来说,线程数量越多,每个线程实际工作时间越短。

线程池模式可帮助多线程应用节省资源,并将并行度控制在预设范围内。使用线程池时,我们将并发代码编写为并行任务并提交给线程池实例执行。该实例管理多个可复用线程来执行这些任务。

线程池工作原理

该模式允许我们控制应用创建的线程数量及其生命周期,同时能调度任务执行并将待处理任务存入队列。

3. Java中的线程池实现

3.1. Executors、Executor和ExecutorService

Executors工具类包含多个创建预配置线程池实例的方法。这些类是很好的起点,当不需要自定义调优时可直接使用。

我们通过ExecutorExecutorService接口与Java中不同线程池实现交互。通常应该将代码与线程池的具体实现解耦,在整个应用中使用这些接口。

3.1.1. Executor

Executor接口只有一个execute方法,用于提交Runnable实例执行。

快速示例:使用ExecutorsAPI获取由单线程池和无界队列支持的Executor实例,顺序执行任务。

这里运行一个简单任务——在屏幕打印"Hello World"。我们以lambda形式(Java 8特性)提交任务,编译器会推断其为Runnable

Executor executor = Executors.newSingleThreadExecutor();
executor.execute(() -> System.out.println("Hello World"));

3.1.2. ExecutorService

ExecutorService接口包含大量方法控制任务进度和管理服务终止。使用该接口可提交任务执行,并通过返回的Future实例控制执行过程。

现在创建ExecutorService,提交任务,然后使用返回的Futureget方法等待任务完成并获取返回值:

ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(() -> "Hello World");
// 其他操作
String result = future.get();

实际场景中通常不会立即调用future.get(),而是推迟到真正需要计算结果时再调用。

这里重载的submit方法可接收RunnableCallable。两者都是函数式接口,可作为lambda传递(Java 8起)。

Runnable的单一方法不抛异常且无返回值。Callable接口更灵活,允许抛出异常和返回值。

要让编译器推断Callable类型,只需在lambda中返回值即可。

更多ExecutorServiceFuture的使用示例,可参考Java ExecutorService指南

3.2. ThreadPoolExecutor

ThreadPoolExecutor是可扩展的线程池实现,提供大量参数和钩子方法进行精细调优。

主要配置参数包括:corePoolSizemaximumPoolSizekeepAliveTime

线程池包含固定数量的核心线程(始终保留)和超额线程(按需创建,空闲时终止)。

  • corePoolSize:核心线程数量,实例化后保留在池中。新任务到达时,**若所有核心线程繁忙且内部队列已满,池可扩展至maximumPoolSize**。
  • keepAliveTime:超额线程(超出corePoolSize部分)的空闲存活时间。默认情况下,ThreadPoolExecutor仅考虑移除非核心线程。要对核心线程应用相同策略,可使用allowCoreThreadTimeOut(true)方法。

这些参数覆盖广泛场景,但典型配置已在Executors静态方法中预定义

3.2.1. newFixedThreadPool

示例:newFixedThreadPool方法创建ThreadPoolExecutor,其corePoolSizemaximumPoolSize相等,keepAliveTime为零。这意味着线程数量固定不变:

ThreadPoolExecutor executor = 
  (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});

assertEquals(2, executor.getPoolSize());
assertEquals(1, executor.getQueue().size());

这里创建线程数为2的固定线程池。若同时运行任务数≤2,会立即执行;否则,部分任务需排队等待

我们创建三个模拟耗时操作的Callable任务(休眠1000毫秒)。前两个立即执行,第三个进入队列等待。提交任务后立即调用getPoolSize()getQueue().size()可验证此行为。

3.2.2. Executors.newCachedThreadPool()

使用Executors.newCachedThreadPool()可创建另一预配置的ThreadPoolExecutor。该方法不接收线程数参数:设置corePoolSize为0,maximumPoolSizeInteger.MAX_VALUEkeepAliveTime为60秒:

ThreadPoolExecutor executor = 
  (ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});

assertEquals(3, executor.getPoolSize());
assertEquals(0, executor.getQueue().size());

这些参数值意味着缓存线程池可无限增长以适应任意数量的提交任务,但线程空闲60秒后被回收。典型场景是应用中有大量短生命周期任务。

队列大小始终为零,因其内部使用SynchronousQueue实例。SynchronousQueue中插入和移除操作始终同时发生,队列永不包含实际元素。

3.2.3. Executors.newSingleThreadExecutor()

Executors.newSingleThreadExecutor()API创建另一种典型ThreadPoolExecutor,仅包含单个线程。单线程执行器适合创建事件循环corePoolSizemaximumPoolSize均为1,keepAliveTime为0。

示例中的任务将顺序执行,任务完成后标志位值为2:

AtomicInteger counter = new AtomicInteger();

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
    counter.set(1);
});
executor.submit(() -> {
    counter.compareAndSet(1, 2);
});

此外,该ThreadPoolExecutor被不可变包装器装饰,创建后无法重新配置。注意这也是无法将其强制转换为ThreadPoolExecutor的原因。

3.3. ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor继承ThreadPoolExecutor类并实现ScheduledExecutorService接口,新增多个方法:

  • schedule:在指定延迟后执行一次任务
  • scheduleAtFixedRate:在初始延迟后重复执行任务,period参数测量任务开始时间间隔,执行率固定
  • scheduleWithFixedDelay:类似scheduleAtFixedRate,但指定延迟测量前一个任务结束与下一个任务开始的时间间隔,执行率可能变化

通常使用Executors.newScheduledThreadPool()创建ScheduledThreadPoolExecutor,指定corePoolSizemaximumPoolSize无界,keepAliveTime为零。

以下代码演示如何调度500毫秒后执行任务:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
executor.schedule(() -> {
    System.out.println("Hello World");
}, 500, TimeUnit.MILLISECONDS);

以下代码展示如何延迟500毫秒执行任务,之后每100毫秒重复一次。调度任务后,使用CountDownLatch等待其触发三次,然后通过Future.cancel()取消:

CountDownLatch lock = new CountDownLatch(3);

ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
    System.out.println("Hello World");
    lock.countDown();
}, 500, 100, TimeUnit.MILLISECONDS);

lock.await(1000, TimeUnit.MILLISECONDS);
future.cancel(true);

3.4. ForkJoinPool

ForkJoinPool是Java 7引入的fork/join框架核心部分,解决递归算法中创建多个任务的常见问题。若使用简单ThreadPoolExecutor,每个任务/子任务都需要独立线程,会快速耗尽线程资源。

在fork/join框架中,任务可派生(fork)多个子任务,并通过join方法等待其完成。该框架的优势在于不为每个任务/子任务创建新线程,而是实现工作窃取算法。该框架在Java Fork/Join框架指南中有详述。

以下示例使用ForkJoinPool遍历节点树并计算所有叶子值总和。首先实现树结构:节点包含int值和子节点集合:

static class TreeNode {

    int value;

    Set<TreeNode> children;

    TreeNode(int value, TreeNode... children) {
        this.value = value;
        this.children = Sets.newHashSet(children);
    }
}

要并行计算树中所有值之和,需实现RecursiveTask<Integer>接口。每个任务接收自己的节点,将其值与子节点值之和相加。任务实现步骤如下:

  1. 流式处理children集合
  2. 映射流元素,为每个元素创建CountingTask
  3. 通过fork运行每个子任务
  4. 调用每个已派生任务的join方法收集结果
  5. 使用Collectors.summingInt收集器求和
public static class CountingTask extends RecursiveTask<Integer> {

    private final TreeNode node;

    public CountingTask(TreeNode node) {
        this.node = node;
    }

    @Override
    protected Integer compute() {
        return node.value + node.children.stream()
          .map(childNode -> new CountingTask(childNode).fork())
          .collect(Collectors.summingInt(ForkJoinTask::join));
    }
}

在具体树上运行计算的代码非常简单:

TreeNode tree = new TreeNode(5,
  new TreeNode(3), new TreeNode(2,
    new TreeNode(2), new TreeNode(8)));

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
int sum = forkJoinPool.invoke(new CountingTask(tree));

4. Guava中的线程池实现

Guava是流行的Google工具库,包含许多实用并发类,包括多个便捷的ExecutorService实现。这些实现类无法直接实例化或子类化,创建实例的唯一入口是MoreExecutors工具类。

4.1. 添加Guava Maven依赖

在Maven pom文件添加以下依赖引入Guava库。最新版本可在Maven中央仓库查找:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>33.2.1-jre</version>
</dependency>

4.2. 直接执行器和直接执行服务

有时需要根据条件决定在当前线程或线程池中运行任务。我们希望使用单一Executor接口并切换实现。虽然实现ExecutorExecutorService在当前线程运行任务不难,但仍需编写模板代码。

Guava提供了预定义实例简化操作。

示例:演示在相同线程中执行任务。尽管任务休眠500毫秒,它阻塞当前线程execute调用完成后结果立即可用:

Executor executor = MoreExecutors.directExecutor();

AtomicBoolean executed = new AtomicBoolean();

executor.execute(() -> {
    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    executed.set(true);
});

assertTrue(executed.get());

directExecutor()返回的实例是静态单例,使用该方法不会产生对象创建开销。

应优先使用此方法而非MoreExecutors.newDirectExecutorService(),因为后者每次调用都会创建完整的执行器服务实现。

4.3. 退出执行服务

另一常见问题是线程池仍在运行任务时关闭虚拟机。即使有取消机制,也无法保证任务会在执行器服务关闭时优雅停止。这可能导致JVM在任务持续运行时无限挂起。

为解决此问题,Guava引入了退出执行服务系列。它们基于随JVM一同终止的守护线程

这些服务还通过Runtime.getRuntime().addShutdownHook()添加关闭钩子,并在放弃挂起任务前阻止VM终止一段配置时间。

以下示例提交包含无限循环的任务,但使用退出执行服务配置VM终止时等待任务100毫秒:

ThreadPoolExecutor executor = 
  (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
ExecutorService executorService = 
  MoreExecutors.getExitingExecutorService(executor, 
    100, TimeUnit.MILLISECONDS);

executorService.submit(() -> {
    while (true) {
    }
});

若没有exitingExecutorService,该任务会导致VM无限挂起。

4.4. 监听装饰器

监听装饰器允许包装ExecutorService,在提交任务时返回ListenableFuture实例而非简单FutureListenableFuture接口扩展Future并新增单一方法addListener,可添加在Future完成时调用的监听器。

我们很少直接使用ListenableFuture.addListener()方法,但它对Futures工具类中的大多数辅助方法至关重要

例如,使用Futures.allAsList()方法可将多个ListenableFuture合并为一个ListenableFuture,在所有Future成功完成后完成:

ExecutorService executorService = Executors.newCachedThreadPool();
ListeningExecutorService listeningExecutorService = 
  MoreExecutors.listeningDecorator(executorService);

ListenableFuture<String> future1 = 
  listeningExecutorService.submit(() -> "Hello");
ListenableFuture<String> future2 = 
  listeningExecutorService.submit(() -> "World");

String greeting = Futures.allAsList(future1, future2).get()
  .stream()
  .collect(Collectors.joining(" "));
assertEquals("Hello World", greeting);

5. 总结

本文探讨了线程池模式及其在标准Java库和Google Guava库中的实现。文章源代码可在GitHub获取。


» 下一篇: Jackson与Gson比较