1. 概述
java.util.concurrent
包为构建并发应用程序提供了强大的工具集。本文将全面梳理该包的核心组件,帮助开发者高效掌握并发编程的关键技术。
2. 核心组件
java.util.concurrent
包含大量实用工具,本文聚焦最常用的核心组件:
-
Executor
-
ExecutorService
-
ScheduledExecutorService
-
Future
-
CountDownLatch
-
CyclicBarrier
-
Semaphore
-
ThreadFactory
-
BlockingQueue
-
DelayQueue
-
Locks
-
Phaser
各组件的深度解析可参考 Baeldung 专题文章。
2.1. Executor
Executor
是一个接口,用于执行提交的任务。任务是在新线程还是当前线程执行,取决于具体实现。通过该接口,我们可以将任务执行流程与实际执行机制解耦。
⚠️ 注意:Executor
不强制要求异步执行。最简单的实现可能直接在调用线程中同步执行任务。
创建执行器示例:
public class Invoker implements Executor {
@Override
public void execute(Runnable r) {
r.run();
}
}
使用执行器提交任务:
public void execute() {
Executor executor = new Invoker();
executor.execute( () -> {
// 任务逻辑
});
}
❌ 如果执行器无法接受任务,将抛出 RejectedExecutionException
。
2.2. ExecutorService
ExecutorService
提供了完整的异步处理解决方案,通过内存队列管理任务,并根据线程可用性调度执行。
首先定义任务类:
public class Task implements Runnable {
@Override
public void run() {
// 任务细节
}
}
创建线程池并提交任务:
ExecutorService executor = Executors.newFixedThreadPool(10);
public void execute() {
executor.submit(new Task());
}
// 或使用 Lambda 提交
executor.submit(() -> {
new Task();
});
✅ 终止方法对比:
| 方法 | 行为 |
|------|------|
| shutdown()
| 等待所有任务完成后关闭 |
| shutdownNow()
| 尝试终止所有任务,停止等待任务处理 |
阻塞等待任务完成:
try {
executor.awaitTermination(20l, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
2.3. ScheduledExecutorService
ScheduledExecutorService
在 ExecutorService
基础上增加了周期性任务调度能力。
Executor
和ExecutorService
的方法都是即时执行,零或负延迟值表示立即执行。
使用 Runnable
和 Callable
定义任务:
public void execute() {
ScheduledExecutorService executorService
= Executors.newSingleThreadScheduledExecutor();
Future<String> future = executorService.schedule(() -> {
// ...
return "Hello world";
}, 1, TimeUnit.SECONDS);
ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {
// ...
}, 1, TimeUnit.SECONDS);
executorService.shutdown();
}
固定延迟调度对比:
// 固定频率调度(不受任务执行时间影响)
executorService.scheduleAtFixedRate(() -> {
// ...
}, 1, 10, TimeUnit.SECONDS);
// 固定延迟调度(基于上一次任务完成时间)
executorService.scheduleWithFixedDelay(() -> {
// ...
}, 1, 10, TimeUnit.SECONDS);
2.4. Future
Future
代表异步操作的结果,提供检查操作状态、获取计算结果等方法。
核心方法:
cancel(boolean mayInterruptIfRunning)
:取消操作isDone()
:检查是否完成get()
:获取结果(阻塞)
创建并使用 Future
:
public void invoke() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(() -> {
// ...
Thread.sleep(10000l);
return "Hello world";
});
}
// 检查状态并获取结果
if (future.isDone() && !future.isCancelled()) {
try {
str = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
// 带超时的获取
try {
future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
2.5. CountDownLatch
CountDownLatch
(JDK 5 引入)使一组线程阻塞,直到某个操作完成。初始化时指定计数器,线程完成时递减计数器,计数器归零后释放所有等待线程。
深度解析参考:Java CountDownLatch 指南
2.6. CyclicBarrier
CyclicBarrier
与 CountDownLatch
类似,但可重复使用。它允许多个线程在屏障条件(await()
)处相互等待,然后执行最终任务。
定义屏障任务:
public class Task implements Runnable {
private CyclicBarrier barrier;
public Task(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
LOG.info(Thread.currentThread().getName() +
" is waiting");
barrier.await();
LOG.info(Thread.currentThread().getName() +
" is released");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
启动多线程等待屏障:
public void start() {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
// ...
LOG.info("All previous tasks are completed");
});
Thread t1 = new Thread(new Task(cyclicBarrier), "T1");
Thread t2 = new Thread(new Task(cyclicBarrier), "T2");
Thread t3 = new Thread(new Task(cyclicBarrier), "T3");
if (!cyclicBarrier.isBroken()) {
t1.start();
t2.start();
t3.start();
}
}
⚠️ 使用 isBroken()
检查线程是否中断,避免后续处理异常。
2.7. Semaphore
Semaphore
控制对物理/逻辑资源的线程级访问。它维护一组许可证,线程进入临界区前需检查许可证:
static Semaphore semaphore = new Semaphore(10);
public void execute() throws InterruptedException {
LOG.info("Available permit : " + semaphore.availablePermits());
LOG.info("Number of threads waiting to acquire: " +
semaphore.getQueueLength());
if (semaphore.tryAcquire()) {
try {
// 临界区代码
} finally {
semaphore.release();
}
}
}
关键特性:
tryAcquire()
:非阻塞获取许可证release()
:释放许可证tryAcquire(long timeout, TimeUnit unit)
:带超时获取
可实现类似
Mutex
的数据结构,详见:Java Semaphore 实践
2.8. ThreadFactory
ThreadFactory
按需创建新线程,消除线程创建的样板代码。
自定义线程工厂:
public class BaeldungThreadFactory implements ThreadFactory {
private int threadId;
private String name;
public BaeldungThreadFactory(String name) {
threadId = 1;
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, name + "-Thread_" + threadId);
LOG.info("created new thread with id : " + threadId +
" and name : " + t.getName());
threadId++;
return t;
}
}
使用工厂创建线程:
BaeldungThreadFactory factory = new BaeldungThreadFactory(
"BaeldungThreadFactory");
for (int i = 0; i < 10; i++) {
Thread t = factory.newThread(new Task());
t.start();
}
2.9. BlockingQueue
BlockingQueue
是生产者-消费者模式的核心数据结构,常用于异步编程场景。
深度解析参考:Java BlockingQueue 指南
2.10. DelayQueue
DelayQueue
是无界阻塞队列,元素只有过期时间到达后才能被取出。队首元素是延迟最长的元素,最后被轮询。
深度解析参考:Java DelayQueue 实践
2.11. Locks
Lock
工具用于阻止其他线程访问代码段。与 synchronized
块的关键区别:
synchronized
作用域限于方法内Lock
的lock()
和unlock()
可在不同方法中调用
深度解析参考:Java 并发锁机制
2.12. Phaser
Phaser
是比 CyclicBarrier
和 CountDownLatch
更灵活的解决方案,支持动态数量的线程在可重用屏障上等待。可协调多个执行阶段,每个阶段复用 Phaser
实例。
深度解析参考:Java Phaser 高级用法
3. 总结
本文系统梳理了 java.util.concurrent
包的核心组件及其典型应用场景。掌握这些工具能显著提升并发编程的效率与可靠性。
完整源码参考:GitHub 仓库