2. 1. 概述
本文将深入探讨java.util.concurrent
包中的Phaser工具类。它与CountDownLatch类似,都是用于线程协调的并发控制工具,但提供了更灵活的功能。
Phaser本质上是一个动态可调的同步屏障,允许线程在屏障处等待,直到所有参与者都到达后才能继续执行。与CountDownLatch最大的区别在于:
- ✅ 动态调整参与者数量:可以在运行时增减线程数
- ❌ CountDownLatch:必须在创建时指定固定线程数
2. 2. Phaser API详解
Phaser的核心机制是多阶段协调,每个阶段可以有不同的线程参与。关键API包括:
核心方法
register()
线程注册参与当前阶段,增加参与者计数phaser.register(); // 当前线程注册
arriveAndAwaitAdvance()
到达屏障并等待其他线程(阻塞方法)phaser.arriveAndAwaitAdvance(); // 等待直到所有线程到达
arriveAndDeregister()
完成任务后注销当前线程phaser.arriveAndDeregister(); // 完成后注销
getPhase()
获取当前阶段号int currentPhase = phaser.getPhase();
工作流程
graph TD
A[线程注册] --> B[到达屏障]
B --> C{所有线程到达?}
C -->|否| D[继续等待]
C -->|是| E[阶段推进]
E --> F[线程注销]
⚠️ 注意:注册后无法直接检查某线程是否已注册,需要子类化才能支持此功能。
2. 3. 使用Phaser API实现逻辑
假设需要协调两个执行阶段:
- 阶段1:3个线程处理
- 阶段2:2个线程处理
实现一个任务类LongRunningAction
:
class LongRunningAction implements Runnable {
private String threadName;
private Phaser ph;
LongRunningAction(String threadName, Phaser ph) {
this.threadName = threadName;
this.ph = ph;
this.randomWait(); // 模拟初始化耗时
ph.register(); // 注册参与
}
@Override
public void run() {
// 阶段1:等待所有线程到达
ph.arriveAndAwaitAdvance();
// 模拟任务执行
randomWait();
// 完成后注销
ph.arriveAndDeregister();
}
// 模拟真实工作负载
private void randomWait() {
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
关键点说明
- 初始化时注册:构造函数中调用
register()
- 屏障等待:
arriveAndAwaitAdvance()
实现阶段同步 - 资源清理:完成后调用
arriveAndDeregister()
- 随机延迟:
randomWait()
模拟真实场景的耗时差异
2. 4. 测试Phaser API
测试场景:先启动3个线程执行阶段1,再启动2个线程执行阶段2。
初始化设置
// 创建Phaser,主线程作为协调者
Phaser ph = new Phaser(1);
assertEquals(0, ph.getPhase()); // 初始阶段为0
💡 为什么参数是1?
主线程作为协调者参与计数,所以初始注册数=1(主线程)+3(工作线程)=4
阶段1:3个线程执行
// 启动3个工作线程
new Thread(new LongRunningAction("thread-1", ph)).start();
new Thread(new LongRunningAction("thread-2", ph)).start();
new Thread(new LongRunningAction("thread-3", ph)).start();
// 主线程等待(第4个参与者)
ph.arriveAndAwaitAdvance();
assertEquals(1, ph.getPhase()); // 阶段推进到1
阶段2:2个线程执行
// 动态调整:新增2个线程
new Thread(new LongRunningAction("thread-4", ph)).start();
new Thread(new LongRunningAction("thread-5", ph)).start();
// 主线程再次等待
ph.arriveAndAwaitAdvance();
assertEquals(2, ph.getPhase()); // 阶段推进到2
// 主线程注销
ph.arriveAndDeregister();
执行输出示例
Thread thread-1 registered during phase 0
Thread thread-2 registered during phase 0
Thread thread-3 registered during phase 0
Thread main waiting for others
Thread thread-1 BEFORE long running action in phase 0
Thread thread-2 BEFORE long running action in phase 0
Thread thread-3 BEFORE long running action in phase 0
Thread main proceeding in phase 1
Thread thread-4 registered during phase 1
Thread thread-5 registered during phase 1
Thread thread-4 BEFORE long running action in phase 1
Thread thread-5 BEFORE long running action in phase 1
Thread main waiting for new phase
Thread main proceeding in phase 2
Thread thread-4 AFTER long running action in phase 2
Thread thread-5 AFTER long running action in phase 2
2. 5. 总结
Phaser是Java并发工具箱中的利器,特别适合需要多阶段协调的场景:
核心优势
- ✅ 动态灵活性:运行时调整线程数
- ✅ 阶段复用:同一实例协调多个阶段
- ✅ 细粒度控制:支持父子Phaser分层结构
适用场景
- 分阶段任务:如MapReduce中的多轮数据处理
- 动态线程池:参与者数量可变的任务
- 游戏开发:多关卡/回合制同步
踩坑提醒
- ❌ 避免忘记调用
arriveAndDeregister()
导致资源泄漏 - ❌ 阶段推进后未重置状态可能导致逻辑错误
- ⚠️ 大规模并发时考虑分层Phaser减少竞争
完整代码示例见GitHub项目,直接导入Maven项目即可运行测试。