1. 概述

java.util.concurrent 包为构建并发应用程序提供了强大的工具集。本文将全面梳理该包的核心组件,帮助开发者高效掌握并发编程的关键技术。

2. 核心组件

java.util.concurrent 包含大量实用工具,本文聚焦最常用的核心组件:

  • Executor
  • ExecutorService
  • ScheduledExecutorService
  • Future
  • CountDownLatch
  • CyclicBarrier
  • Semaphore
  • ThreadFactory
  • BlockingQueue
  • DelayQueue
  • Locks
  • Phaser

各组件的深度解析可参考 Baeldung 专题文章。

2.1. Executor

Executor 是一个接口,用于执行提交的任务。任务是在新线程还是当前线程执行,取决于具体实现。通过该接口,我们可以将任务执行流程与实际执行机制解耦。

⚠️ 注意:Executor 不强制要求异步执行。最简单的实现可能直接在调用线程中同步执行任务。

创建执行器示例:

public class Invoker implements Executor {
    @Override
    public void execute(Runnable r) {
        r.run();
    }
}

使用执行器提交任务:

public void execute() {
    Executor executor = new Invoker();
    executor.execute( () -> {
        // 任务逻辑
    });
}

❌ 如果执行器无法接受任务,将抛出 RejectedExecutionException

2.2. ExecutorService

ExecutorService 提供了完整的异步处理解决方案,通过内存队列管理任务,并根据线程可用性调度执行。

首先定义任务类:

public class Task implements Runnable {
    @Override
    public void run() {
        // 任务细节
    }
}

创建线程池并提交任务:

ExecutorService executor = Executors.newFixedThreadPool(10);

public void execute() { 
    executor.submit(new Task()); 
}

// 或使用 Lambda 提交
executor.submit(() -> {
    new Task();
});

✅ 终止方法对比: | 方法 | 行为 | |------|------| | shutdown() | 等待所有任务完成后关闭 | | shutdownNow() | 尝试终止所有任务,停止等待任务处理 |

阻塞等待任务完成:

try {
    executor.awaitTermination(20l, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
    e.printStackTrace();
}

2.3. ScheduledExecutorService

ScheduledExecutorServiceExecutorService 基础上增加了周期性任务调度能力。

ExecutorExecutorService 的方法都是即时执行,零或负延迟值表示立即执行。

使用 RunnableCallable 定义任务:

public void execute() {
    ScheduledExecutorService executorService
      = Executors.newSingleThreadScheduledExecutor();

    Future<String> future = executorService.schedule(() -> {
        // ...
        return "Hello world";
    }, 1, TimeUnit.SECONDS);

    ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {
        // ...
    }, 1, TimeUnit.SECONDS);

    executorService.shutdown();
}

固定延迟调度对比:

// 固定频率调度(不受任务执行时间影响)
executorService.scheduleAtFixedRate(() -> {
    // ...
}, 1, 10, TimeUnit.SECONDS);

// 固定延迟调度(基于上一次任务完成时间)
executorService.scheduleWithFixedDelay(() -> {
    // ...
}, 1, 10, TimeUnit.SECONDS);

2.4. Future

Future 代表异步操作的结果,提供检查操作状态、获取计算结果等方法。

核心方法:

  • cancel(boolean mayInterruptIfRunning):取消操作
  • isDone():检查是否完成
  • get():获取结果(阻塞)

创建并使用 Future

public void invoke() {
    ExecutorService executorService = Executors.newFixedThreadPool(10);

    Future<String> future = executorService.submit(() -> {
        // ...
        Thread.sleep(10000l);
        return "Hello world";
    });
}

// 检查状态并获取结果
if (future.isDone() && !future.isCancelled()) {
    try {
        str = future.get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

// 带超时的获取
try {
    future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
    e.printStackTrace();
}

2.5. CountDownLatch

CountDownLatch(JDK 5 引入)使一组线程阻塞,直到某个操作完成。初始化时指定计数器,线程完成时递减计数器,计数器归零后释放所有等待线程。

深度解析参考:Java CountDownLatch 指南

2.6. CyclicBarrier

CyclicBarrierCountDownLatch 类似,但可重复使用。它允许多个线程在屏障条件(await())处相互等待,然后执行最终任务。

定义屏障任务:

public class Task implements Runnable {
    private CyclicBarrier barrier;

    public Task(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public void run() {
        try {
            LOG.info(Thread.currentThread().getName() + 
              " is waiting");
            barrier.await();
            LOG.info(Thread.currentThread().getName() + 
              " is released");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

启动多线程等待屏障:

public void start() {
    CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
        // ...
        LOG.info("All previous tasks are completed");
    });

    Thread t1 = new Thread(new Task(cyclicBarrier), "T1"); 
    Thread t2 = new Thread(new Task(cyclicBarrier), "T2"); 
    Thread t3 = new Thread(new Task(cyclicBarrier), "T3"); 

    if (!cyclicBarrier.isBroken()) { 
        t1.start(); 
        t2.start(); 
        t3.start(); 
    }
}

⚠️ 使用 isBroken() 检查线程是否中断,避免后续处理异常。

2.7. Semaphore

Semaphore 控制对物理/逻辑资源的线程级访问。它维护一组许可证,线程进入临界区前需检查许可证:

static Semaphore semaphore = new Semaphore(10);

public void execute() throws InterruptedException {
    LOG.info("Available permit : " + semaphore.availablePermits());
    LOG.info("Number of threads waiting to acquire: " + 
      semaphore.getQueueLength());

    if (semaphore.tryAcquire()) {
        try {
            // 临界区代码
        } finally {
            semaphore.release();
        }
    }
}

关键特性:

  • tryAcquire():非阻塞获取许可证
  • release():释放许可证
  • tryAcquire(long timeout, TimeUnit unit):带超时获取

可实现类似 Mutex 的数据结构,详见:Java Semaphore 实践

2.8. ThreadFactory

ThreadFactory 按需创建新线程,消除线程创建的样板代码。

自定义线程工厂:

public class BaeldungThreadFactory implements ThreadFactory {
    private int threadId;
    private String name;

    public BaeldungThreadFactory(String name) {
        threadId = 1;
        this.name = name;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, name + "-Thread_" + threadId);
        LOG.info("created new thread with id : " + threadId +
            " and name : " + t.getName());
        threadId++;
        return t;
    }
}

使用工厂创建线程:

BaeldungThreadFactory factory = new BaeldungThreadFactory( 
    "BaeldungThreadFactory");
for (int i = 0; i < 10; i++) { 
    Thread t = factory.newThread(new Task());
    t.start(); 
}

2.9. BlockingQueue

BlockingQueue 是生产者-消费者模式的核心数据结构,常用于异步编程场景。

深度解析参考:Java BlockingQueue 指南

2.10. DelayQueue

DelayQueue 是无界阻塞队列,元素只有过期时间到达后才能被取出。队首元素是延迟最长的元素,最后被轮询。

深度解析参考:Java DelayQueue 实践

2.11. Locks

Lock 工具用于阻止其他线程访问代码段。与 synchronized 块的关键区别:

  • synchronized 作用域限于方法内
  • Locklock()unlock() 可在不同方法中调用

深度解析参考:Java 并发锁机制

2.12. Phaser

Phaser 是比 CyclicBarrierCountDownLatch 更灵活的解决方案,支持动态数量的线程在可重用屏障上等待。可协调多个执行阶段,每个阶段复用 Phaser 实例。

深度解析参考:Java Phaser 高级用法

3. 总结

本文系统梳理了 java.util.concurrent 包的核心组件及其典型应用场景。掌握这些工具能显著提升并发编程的效率与可靠性。

完整源码参考:GitHub 仓库


原始标题:Overview of the java.util.concurrent | Baeldung

« 上一篇: PDFUnit 使用指南
» 下一篇: Cucumber Java 8 支持