1. 概述

本文将深入探讨 java.util.concurrent.Exchanger<T> 的使用。Exchanger 提供了一个线程配对交换数据的同步点,适用于两个线程之间高效、成对地交换对象。

它的设计非常简单,核心就是“你等我,我等你,碰头就换,换完就走”。

2. Exchanger 基本用法

Exchanger<T> 是 Java 并发包中一个专门用于两个线程之间交换对象的工具类。它只提供了一个核心方法:

public V exchange(V x) throws InterruptedException

工作原理

  • 当一个线程调用 exchange() 时,它会阻塞等待另一个线程也调用 exchange()
  • 一旦两个线程都到达交换点(rendezvous point),它们持有的对象就会互换
  • 交换完成后,两个线程各自拿到对方的数据,继续执行

⚠️ 注意:必须是成对的两个线程,多一个少一个都不行。如果只有一个线程调用,它会一直阻塞下去(除非使用带超时的版本)。

示例:线程间消息交换

下面是一个典型的两个线程互相交换字符串的场景:

@Test
public void givenThreads_whenMessageExchanged_thenCorrect() {
    Exchanger<String> exchanger = new Exchanger<>();

    Runnable taskA = () -> {
        try {
            String message = exchanger.exchange("from A");
            assertEquals("from B", message);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    };

    Runnable taskB = () -> {
        try {
            String message = exchanger.exchange("from B");
            assertEquals("from A", message);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    };
    CompletableFuture.allOf(
      runAsync(taskA), runAsync(taskB)).join();
}

📌 关键点:

  • 两个线程几乎同时调用 exchange(),触发交换
  • taskA 发送 "from A",收到 "from B"
  • taskB 发送 "from B",收到 "from A"

主线程与子线程交换数据

也可以是主线程和一个子线程进行交换:

@Test
public void givenThread_WhenExchangedMessage_thenCorrect() throws InterruptedException {
    Exchanger<String> exchanger = new Exchanger<>();

    Runnable runner = () -> {
        try {
            String message = exchanger.exchange("from runner");
            assertEquals("to runner", message);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    };
    CompletableFuture<Void> result 
      = CompletableFuture.runAsync(runner);
    String msg = exchanger.exchange("to runner");
    assertEquals("from runner", msg);
    result.join();
}

⚠️ 踩坑提醒:

  • 必须先启动子线程(runAsync),再在主线程调用 exchange()
  • 否则主线程先执行 exchange(),会一直阻塞,子线程还没起来,没人和它“碰头”

超时机制

如果担心对方线程出问题导致无限等待,可以用带超时的版本:

V exchange(V x, long timeout, TimeUnit unit)

这样如果另一个线程迟迟不出现,当前线程可以在指定时间后抛出 TimeoutException,避免死锁。

3. 无 GC 数据交换:构建高性能数据流水线

Exchanger 的一个高级用法是构建低 GC 压力的数据处理流水线。通过重复利用缓冲区对象,避免频繁创建/销毁对象,从而减少垃圾回收压力。

场景设计

我们构建一个三段式流水线:

  1. Reader:生产数据,填满缓冲区后交换给 Processor
  2. Processor:处理数据,处理完后交换给 Writer
  3. Writer:消费数据,打印输出

通过两个 Exchanger 实现:

  • readerExchanger:Reader ↔ Processor
  • writerExchanger:Processor ↔ Writer

完整示例代码

@Test
public void givenData_whenPassedThrough_thenCorrect() throws InterruptedException {

    Exchanger<Queue<String>> readerExchanger = new Exchanger<>();
    Exchanger<Queue<String>> writerExchanger = new Exchanger<>();

    final int BUFFER_SIZE = 3;

    Runnable reader = () -> {
        Queue<String> readerBuffer = new ConcurrentLinkedQueue<>();
        while (true) {
            readerBuffer.add(UUID.randomUUID().toString());
            if (readerBuffer.size() >= BUFFER_SIZE) {
                try {
                    readerBuffer = readerExchanger.exchange(readerBuffer);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
    };

    Runnable processor = () -> {
        Queue<String> processorBuffer = new ConcurrentLinkedQueue<>();
        Queue<String> writerBuffer = new ConcurrentLinkedQueue<>();
        try {
            processorBuffer = readerExchanger.exchange(processorBuffer);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
        while (true) {
            String item = processorBuffer.poll();
            if (item != null) {
                writerBuffer.add("processed:" + item);
            }
            if (processorBuffer.isEmpty()) {
                try {
                    processorBuffer = readerExchanger.exchange(processorBuffer);
                    writerBuffer = writerExchanger.exchange(writerBuffer);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
    };

    Runnable writer = () -> {
        Queue<String> writerBuffer = new ConcurrentLinkedQueue<>();
        try {
            writerBuffer = writerExchanger.exchange(writerBuffer);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
        while (true) {
            String item = writerBuffer.poll();
            if (item != null) {
                System.out.println(item);
            }
            if (writerBuffer.isEmpty()) {
                try {
                    writerBuffer = writerExchanger.exchange(writerBuffer);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
    };

    CompletableFuture.allOf(
      CompletableFuture.runAsync(reader), 
      CompletableFuture.runAsync(processor),
      CompletableFuture.runAsync(writer)).join();
}

核心优势:零 GC 交换

为什么说“无 GC”?

  • 每个线程持有一个缓冲区(如 ConcurrentLinkedQueue
  • 交换的是缓冲区对象本身,而不是里面的数据
  • 交换后,原缓冲区被重用,旧对象不会被丢弃
  • 避免了频繁创建新对象 → 减少 Young GC 次数

❌ 对比传统方式:

  • 使用 BlockingQueue 传输数据,每条数据都要入队出队,可能创建 Node 对象
  • 缓冲区本身也可能频繁创建销毁

📌 类比:这类似于 Disruptor 模式,但更轻量:

  • Disruptor 支持多生产者/消费者
  • Exchanger 仅限成对线程,但实现更简单,适合点对点场景

⚠️ 注意事项:

  • 示例中使用了 while(true),实际应用中需添加优雅关闭逻辑
  • 异常处理已简化,生产环境需更 robust

4. 总结

Exchanger 是一个简单粗暴但非常高效的线程间数据交换工具,适用于以下场景:

适用场景

  • 成对线程间批量数据交换
  • 构建双缓冲(double-buffering)或流水线架构
  • 追求极致性能,减少对象创建和 GC 压力

不适用场景

  • 多于两个线程需要交换数据
  • 需要广播或一对多通信
  • 数据交换频率极低(同步开销可能不值得)

📌 最佳实践:

  • 配合对象池或缓冲区复用,发挥最大性能
  • 使用 exchange(T, timeout) 避免无限等待
  • 注意线程启动顺序,防止死锁

示例代码已上传至 GitHub:https://github.com/yourname/java-concurrency-examples


原始标题:Introduction to Exchanger in Java