1. 简介
CyclicBarrier是Java 5在java.util.concurrent
包中引入的同步工具。本文将深入探讨这个同步器在并发场景中的实际应用。
2. Java并发同步器
java.util.concurrent
包包含多个用于管理线程协作的类,主要包括:
- CyclicBarrier
- Phaser
- CountDownLatch
- Exchanger
- Semaphore
- SynchronousQueue
这些类为常见线程交互模式提供了开箱即用的功能。当线程间存在标准协作模式时,直接使用这些同步器(Synchronizers)远比手动实现锁和条件对象更高效。接下来我们重点解析CyclicBarrier。
3. CyclicBarrier核心概念
CyclicBarrier是一种同步器,允许一组线程互相等待,直到所有线程都到达某个公共执行点(称为屏障)。
CyclicBarrier适用于固定数量线程的场景:这些线程必须全部到达屏障点后才能继续执行。
之所以称为"循环"(Cyclic),是因为等待线程释放后,屏障可以被重复使用。
4. 基本用法
CyclicBarrier的构造器很简单,只需指定需要等待的线程数量(称为"参与方"):
public CyclicBarrier(int parties)
线程通过调用await()
方法注册自己已到达屏障点。该调用是同步的,线程会挂起直到指定数量的线程都调用了await()。当所有线程都到达时,称为"触发屏障"(tripping the barrier)。
可选地,可以传入第二个参数——一个Runnable实例,由最后一个触发屏障的线程执行:
public CyclicBarrier(int parties, Runnable barrierAction)
5. 实战案例
考虑这个场景:多个线程分别计算部分结果,当所有线程完成后,由最后一个线程汇总所有结果。
5.1 主类实现
public class CyclicBarrierDemo {
private CyclicBarrier cyclicBarrier;
private List<List<Integer>> partialResults
= Collections.synchronizedList(new ArrayList<>());
private Random random = new Random();
private int NUM_PARTIAL_RESULTS;
private int NUM_WORKERS;
// ...
}
关键点说明:
NUM_WORKERS
:工作线程数NUM_PARTIAL_RESULTS
:每个线程产生的结果数partialResults
:使用线程安全的SynchronizedList
存储结果(普通ArrayList的add()方法非线程安全)
5.2 工作线程实现
public class CyclicBarrierDemo {
// ...
class NumberCruncherThread implements Runnable {
@Override
public void run() {
String thisThreadName = Thread.currentThread().getName();
List<Integer> partialResult = new ArrayList<>();
// 计算部分结果
for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) {
Integer num = random.nextInt(10);
System.out.println(thisThreadName
+ ": Crunching some numbers! Final result - " + num);
partialResult.add(num);
}
partialResults.add(partialResult);
try {
System.out.println(thisThreadName
+ " waiting for others to reach barrier.");
cyclicBarrier.await();
} catch (InterruptedException e) {
// 处理中断
} catch (BrokenBarrierException e) {
// 处理屏障破坏
}
}
}
}
5.3 汇总线程实现
public class CyclicBarrierDemo {
// ...
class AggregatorThread implements Runnable {
@Override
public void run() {
String thisThreadName = Thread.currentThread().getName();
System.out.println(
thisThreadName + ": Computing sum of " + NUM_WORKERS
+ " workers, having " + NUM_PARTIAL_RESULTS + " results each.");
int sum = 0;
for (List<Integer> threadResult : partialResults) {
System.out.print("Adding ");
for (Integer partialResult : threadResult) {
System.out.print(partialResult+" ");
sum += partialResult;
}
System.out.println();
}
System.out.println(thisThreadName + ": Final result = " + sum);
}
}
}
5.4 启动逻辑
public class CyclicBarrierDemo {
// 前述代码
public void runSimulation(int numWorkers, int numberOfPartialResults) {
NUM_PARTIAL_RESULTS = numberOfPartialResults;
NUM_WORKERS = numWorkers;
cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread());
System.out.println("Spawning " + NUM_WORKERS
+ " worker threads to compute "
+ NUM_PARTIAL_RESULTS + " partial results each");
for (int i = 0; i < NUM_WORKERS; i++) {
Thread worker = new Thread(new NumberCruncherThread());
worker.setName("Thread " + i);
worker.start();
}
}
public static void main(String[] args) {
CyclicBarrierDemo demo = new CyclicBarrierDemo();
demo.runSimulation(5, 3);
}
}
关键流程:
- 创建5个工作线程,每个线程计算3个随机数
- 所有线程完成计算后,最后一个触发屏障的线程执行汇总逻辑
6. 执行结果
典型输出如下(每次执行结果可能不同):
Spawning 5 worker threads to compute 3 partial results each
Thread 0: Crunching some numbers! Final result - 6
Thread 0: Crunching some numbers! Final result - 2
Thread 0: Crunching some numbers! Final result - 2
Thread 0 waiting for others to reach barrier.
Thread 1: Crunching some numbers! Final result - 2
Thread 1: Crunching some numbers! Final result - 0
Thread 1: Crunching some numbers! Final result - 5
Thread 1 waiting for others to reach barrier.
Thread 3: Crunching some numbers! Final result - 6
Thread 3: Crunching some numbers! Final result - 4
Thread 3: Crunching some numbers! Final result - 0
Thread 3 waiting for others to reach barrier.
Thread 2: Crunching some numbers! Final result - 1
Thread 2: Crunching some numbers! Final result - 1
Thread 2: Crunching some numbers! Final result - 0
Thread 2 waiting for others to reach barrier.
Thread 4: Crunching some numbers! Final result - 9
Thread 4: Crunching some numbers! Final result - 3
Thread 4: Crunching some numbers! Final result - 5
Thread 4 waiting for others to reach barrier.
Thread 4: Computing final sum of 5 workers, having 3 results each.
Adding 6 2 2
Adding 2 0 5
Adding 6 4 0
Adding 1 1 0
Adding 9 3 5
Thread 4: Final result = 46
关键观察:
- Thread 4是最后一个触发屏障的线程,因此执行了汇总逻辑
- 线程实际执行顺序与启动顺序无关(这是并发编程的常见现象)
7. 总结
本文详细解析了CyclicBarrier的核心概念和适用场景,并通过完整案例展示了如何实现多线程协作计算。当需要固定数量的线程在某个公共点同步时,CyclicBarrier是简单高效的解决方案。
完整代码示例可在GitHub仓库获取。