1. 简介

CyclicBarrier是Java 5在java.util.concurrent包中引入的同步工具。本文将深入探讨这个同步器在并发场景中的实际应用。

2. Java并发同步器

java.util.concurrent包包含多个用于管理线程协作的类,主要包括:

  • CyclicBarrier
  • Phaser
  • CountDownLatch
  • Exchanger
  • Semaphore
  • SynchronousQueue

这些类为常见线程交互模式提供了开箱即用的功能。当线程间存在标准协作模式时,直接使用这些同步器(Synchronizers)远比手动实现锁和条件对象更高效。接下来我们重点解析CyclicBarrier。

3. CyclicBarrier核心概念

CyclicBarrier是一种同步器,允许一组线程互相等待,直到所有线程都到达某个公共执行点(称为屏障)。

CyclicBarrier适用于固定数量线程的场景:这些线程必须全部到达屏障点后才能继续执行。

之所以称为"循环"(Cyclic),是因为等待线程释放后,屏障可以被重复使用

4. 基本用法

CyclicBarrier的构造器很简单,只需指定需要等待的线程数量(称为"参与方"):

public CyclicBarrier(int parties)

线程通过调用await()方法注册自己已到达屏障点。该调用是同步的,线程会挂起直到指定数量的线程都调用了await()。当所有线程都到达时,称为"触发屏障"(tripping the barrier)

可选地,可以传入第二个参数——一个Runnable实例,由最后一个触发屏障的线程执行:

public CyclicBarrier(int parties, Runnable barrierAction)

5. 实战案例

考虑这个场景:多个线程分别计算部分结果,当所有线程完成后,由最后一个线程汇总所有结果。

5.1 主类实现

public class CyclicBarrierDemo {

    private CyclicBarrier cyclicBarrier;
    private List<List<Integer>> partialResults
     = Collections.synchronizedList(new ArrayList<>());
    private Random random = new Random();
    private int NUM_PARTIAL_RESULTS;
    private int NUM_WORKERS;

    // ...
}

关键点说明:

  • NUM_WORKERS:工作线程数
  • NUM_PARTIAL_RESULTS:每个线程产生的结果数
  • partialResults:使用线程安全的SynchronizedList存储结果(普通ArrayList的add()方法非线程安全)

5.2 工作线程实现

public class CyclicBarrierDemo {

    // ...

    class NumberCruncherThread implements Runnable {

        @Override
        public void run() {
            String thisThreadName = Thread.currentThread().getName();
            List<Integer> partialResult = new ArrayList<>();

            // 计算部分结果
            for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) {    
                Integer num = random.nextInt(10);
                System.out.println(thisThreadName
                  + ": Crunching some numbers! Final result - " + num);
                partialResult.add(num);
            }

            partialResults.add(partialResult);
            try {
                System.out.println(thisThreadName 
                  + " waiting for others to reach barrier.");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                // 处理中断
            } catch (BrokenBarrierException e) {
                // 处理屏障破坏
            }
        }
    }
}

5.3 汇总线程实现

public class CyclicBarrierDemo {

    // ...
    
    class AggregatorThread implements Runnable {

        @Override
        public void run() {

            String thisThreadName = Thread.currentThread().getName();

            System.out.println(
              thisThreadName + ": Computing sum of " + NUM_WORKERS 
              + " workers, having " + NUM_PARTIAL_RESULTS + " results each.");
            int sum = 0;

            for (List<Integer> threadResult : partialResults) {
                System.out.print("Adding ");
                for (Integer partialResult : threadResult) {
                    System.out.print(partialResult+" ");
                    sum += partialResult;
                }
                System.out.println();
            }
            System.out.println(thisThreadName + ": Final result = " + sum);
        }
    }
}

5.4 启动逻辑

public class CyclicBarrierDemo {

    // 前述代码
 
    public void runSimulation(int numWorkers, int numberOfPartialResults) {
        NUM_PARTIAL_RESULTS = numberOfPartialResults;
        NUM_WORKERS = numWorkers;

        cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread());

        System.out.println("Spawning " + NUM_WORKERS
          + " worker threads to compute "
          + NUM_PARTIAL_RESULTS + " partial results each");
 
        for (int i = 0; i < NUM_WORKERS; i++) {
            Thread worker = new Thread(new NumberCruncherThread());
            worker.setName("Thread " + i);
            worker.start();
        }
    }

    public static void main(String[] args) {
        CyclicBarrierDemo demo = new CyclicBarrierDemo();
        demo.runSimulation(5, 3);
    }
}

关键流程

  1. 创建5个工作线程,每个线程计算3个随机数
  2. 所有线程完成计算后,最后一个触发屏障的线程执行汇总逻辑

6. 执行结果

典型输出如下(每次执行结果可能不同):

Spawning 5 worker threads to compute 3 partial results each
Thread 0: Crunching some numbers! Final result - 6
Thread 0: Crunching some numbers! Final result - 2
Thread 0: Crunching some numbers! Final result - 2
Thread 0 waiting for others to reach barrier.
Thread 1: Crunching some numbers! Final result - 2
Thread 1: Crunching some numbers! Final result - 0
Thread 1: Crunching some numbers! Final result - 5
Thread 1 waiting for others to reach barrier.
Thread 3: Crunching some numbers! Final result - 6
Thread 3: Crunching some numbers! Final result - 4
Thread 3: Crunching some numbers! Final result - 0
Thread 3 waiting for others to reach barrier.
Thread 2: Crunching some numbers! Final result - 1
Thread 2: Crunching some numbers! Final result - 1
Thread 2: Crunching some numbers! Final result - 0
Thread 2 waiting for others to reach barrier.
Thread 4: Crunching some numbers! Final result - 9
Thread 4: Crunching some numbers! Final result - 3
Thread 4: Crunching some numbers! Final result - 5
Thread 4 waiting for others to reach barrier.
Thread 4: Computing final sum of 5 workers, having 3 results each.
Adding 6 2 2 
Adding 2 0 5 
Adding 6 4 0 
Adding 1 1 0 
Adding 9 3 5 
Thread 4: Final result = 46

关键观察

  • Thread 4是最后一个触发屏障的线程,因此执行了汇总逻辑
  • 线程实际执行顺序与启动顺序无关(这是并发编程的常见现象)

7. 总结

本文详细解析了CyclicBarrier的核心概念和适用场景,并通过完整案例展示了如何实现多线程协作计算。当需要固定数量的线程在某个公共点同步时,CyclicBarrier是简单高效的解决方案。

完整代码示例可在GitHub仓库获取。


原始标题:CyclicBarrier in Java