2. 1. 概述

本文将深入探讨java.util.concurrent包中的Phaser工具类。它与CountDownLatch类似,都是用于线程协调的并发控制工具,但提供了更灵活的功能。

Phaser本质上是一个动态可调的同步屏障,允许线程在屏障处等待,直到所有参与者都到达后才能继续执行。与CountDownLatch最大的区别在于:

  • 动态调整参与者数量:可以在运行时增减线程数
  • CountDownLatch:必须在创建时指定固定线程数

2. 2. Phaser API详解

Phaser的核心机制是多阶段协调,每个阶段可以有不同的线程参与。关键API包括:

核心方法

  1. register()
    线程注册参与当前阶段,增加参与者计数

    phaser.register(); // 当前线程注册
    
  2. arriveAndAwaitAdvance()
    到达屏障并等待其他线程(阻塞方法)

    phaser.arriveAndAwaitAdvance(); // 等待直到所有线程到达
    
  3. arriveAndDeregister()
    完成任务后注销当前线程

    phaser.arriveAndDeregister(); // 完成后注销
    
  4. getPhase()
    获取当前阶段号

    int currentPhase = phaser.getPhase();
    

工作流程

graph TD
    A[线程注册] --> B[到达屏障]
    B --> C{所有线程到达?}
    C -->|否| D[继续等待]
    C -->|是| E[阶段推进]
    E --> F[线程注销]

⚠️ 注意:注册后无法直接检查某线程是否已注册,需要子类化才能支持此功能。

2. 3. 使用Phaser API实现逻辑

假设需要协调两个执行阶段:

  • 阶段1:3个线程处理
  • 阶段2:2个线程处理

实现一个任务类LongRunningAction

class LongRunningAction implements Runnable {
    private String threadName;
    private Phaser ph;

    LongRunningAction(String threadName, Phaser ph) {
        this.threadName = threadName;
        this.ph = ph;
        this.randomWait(); // 模拟初始化耗时
        ph.register(); // 注册参与
    }

    @Override
    public void run() {
        // 阶段1:等待所有线程到达
        ph.arriveAndAwaitAdvance();
        
        // 模拟任务执行
        randomWait();
        
        // 完成后注销
        ph.arriveAndDeregister();
    }

    // 模拟真实工作负载
    private void randomWait() {
        try {
            Thread.sleep((long) (Math.random() * 100));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

关键点说明

  1. 初始化时注册:构造函数中调用register()
  2. 屏障等待arriveAndAwaitAdvance()实现阶段同步
  3. 资源清理:完成后调用arriveAndDeregister()
  4. 随机延迟randomWait()模拟真实场景的耗时差异

2. 4. 测试Phaser API

测试场景:先启动3个线程执行阶段1,再启动2个线程执行阶段2。

初始化设置

// 创建Phaser,主线程作为协调者
Phaser ph = new Phaser(1); 
assertEquals(0, ph.getPhase()); // 初始阶段为0

💡 为什么参数是1?
主线程作为协调者参与计数,所以初始注册数=1(主线程)+3(工作线程)=4

阶段1:3个线程执行

// 启动3个工作线程
new Thread(new LongRunningAction("thread-1", ph)).start();
new Thread(new LongRunningAction("thread-2", ph)).start();
new Thread(new LongRunningAction("thread-3", ph)).start();

// 主线程等待(第4个参与者)
ph.arriveAndAwaitAdvance();
assertEquals(1, ph.getPhase()); // 阶段推进到1

阶段2:2个线程执行

// 动态调整:新增2个线程
new Thread(new LongRunningAction("thread-4", ph)).start();
new Thread(new LongRunningAction("thread-5", ph)).start();

// 主线程再次等待
ph.arriveAndAwaitAdvance();
assertEquals(2, ph.getPhase()); // 阶段推进到2

// 主线程注销
ph.arriveAndDeregister();

执行输出示例

Thread thread-1 registered during phase 0
Thread thread-2 registered during phase 0
Thread thread-3 registered during phase 0
Thread main waiting for others
Thread thread-1 BEFORE long running action in phase 0
Thread thread-2 BEFORE long running action in phase 0
Thread thread-3 BEFORE long running action in phase 0
Thread main proceeding in phase 1
Thread thread-4 registered during phase 1
Thread thread-5 registered during phase 1
Thread thread-4 BEFORE long running action in phase 1
Thread thread-5 BEFORE long running action in phase 1
Thread main waiting for new phase
Thread main proceeding in phase 2
Thread thread-4 AFTER long running action in phase 2
Thread thread-5 AFTER long running action in phase 2

2. 5. 总结

Phaser是Java并发工具箱中的利器,特别适合需要多阶段协调的场景:

核心优势

  • 动态灵活性:运行时调整线程数
  • 阶段复用:同一实例协调多个阶段
  • 细粒度控制:支持父子Phaser分层结构

适用场景

  1. 分阶段任务:如MapReduce中的多轮数据处理
  2. 动态线程池:参与者数量可变的任务
  3. 游戏开发:多关卡/回合制同步

踩坑提醒

  • ❌ 避免忘记调用arriveAndDeregister()导致资源泄漏
  • ❌ 阶段推进后未重置状态可能导致逻辑错误
  • ⚠️ 大规模并发时考虑分层Phaser减少竞争

完整代码示例见GitHub项目,直接导入Maven项目即可运行测试。


原始标题:Guide to Phaser

« 上一篇: Java周报,176