1. 概述
本文将深入探讨 java.util.concurrent.Exchanger<T>
的使用。Exchanger 提供了一个线程配对交换数据的同步点,适用于两个线程之间高效、成对地交换对象。
它的设计非常简单,核心就是“你等我,我等你,碰头就换,换完就走”。
2. Exchanger 基本用法
Exchanger<T>
是 Java 并发包中一个专门用于两个线程之间交换对象的工具类。它只提供了一个核心方法:
public V exchange(V x) throws InterruptedException
✅ 工作原理:
- 当一个线程调用
exchange()
时,它会阻塞等待另一个线程也调用exchange()
- 一旦两个线程都到达交换点(rendezvous point),它们持有的对象就会互换
- 交换完成后,两个线程各自拿到对方的数据,继续执行
⚠️ 注意:必须是成对的两个线程,多一个少一个都不行。如果只有一个线程调用,它会一直阻塞下去(除非使用带超时的版本)。
示例:线程间消息交换
下面是一个典型的两个线程互相交换字符串的场景:
@Test
public void givenThreads_whenMessageExchanged_thenCorrect() {
Exchanger<String> exchanger = new Exchanger<>();
Runnable taskA = () -> {
try {
String message = exchanger.exchange("from A");
assertEquals("from B", message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
};
Runnable taskB = () -> {
try {
String message = exchanger.exchange("from B");
assertEquals("from A", message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
};
CompletableFuture.allOf(
runAsync(taskA), runAsync(taskB)).join();
}
📌 关键点:
- 两个线程几乎同时调用
exchange()
,触发交换 taskA
发送"from A"
,收到"from B"
taskB
发送"from B"
,收到"from A"
主线程与子线程交换数据
也可以是主线程和一个子线程进行交换:
@Test
public void givenThread_WhenExchangedMessage_thenCorrect() throws InterruptedException {
Exchanger<String> exchanger = new Exchanger<>();
Runnable runner = () -> {
try {
String message = exchanger.exchange("from runner");
assertEquals("to runner", message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
};
CompletableFuture<Void> result
= CompletableFuture.runAsync(runner);
String msg = exchanger.exchange("to runner");
assertEquals("from runner", msg);
result.join();
}
⚠️ 踩坑提醒:
- 必须先启动子线程(
runAsync
),再在主线程调用exchange()
- 否则主线程先执行
exchange()
,会一直阻塞,子线程还没起来,没人和它“碰头”
超时机制
如果担心对方线程出问题导致无限等待,可以用带超时的版本:
V exchange(V x, long timeout, TimeUnit unit)
这样如果另一个线程迟迟不出现,当前线程可以在指定时间后抛出 TimeoutException
,避免死锁。
3. 无 GC 数据交换:构建高性能数据流水线
Exchanger 的一个高级用法是构建低 GC 压力的数据处理流水线。通过重复利用缓冲区对象,避免频繁创建/销毁对象,从而减少垃圾回收压力。
场景设计
我们构建一个三段式流水线:
- Reader:生产数据,填满缓冲区后交换给 Processor
- Processor:处理数据,处理完后交换给 Writer
- Writer:消费数据,打印输出
通过两个 Exchanger 实现:
readerExchanger
:Reader ↔ ProcessorwriterExchanger
:Processor ↔ Writer
完整示例代码
@Test
public void givenData_whenPassedThrough_thenCorrect() throws InterruptedException {
Exchanger<Queue<String>> readerExchanger = new Exchanger<>();
Exchanger<Queue<String>> writerExchanger = new Exchanger<>();
final int BUFFER_SIZE = 3;
Runnable reader = () -> {
Queue<String> readerBuffer = new ConcurrentLinkedQueue<>();
while (true) {
readerBuffer.add(UUID.randomUUID().toString());
if (readerBuffer.size() >= BUFFER_SIZE) {
try {
readerBuffer = readerExchanger.exchange(readerBuffer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
};
Runnable processor = () -> {
Queue<String> processorBuffer = new ConcurrentLinkedQueue<>();
Queue<String> writerBuffer = new ConcurrentLinkedQueue<>();
try {
processorBuffer = readerExchanger.exchange(processorBuffer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
while (true) {
String item = processorBuffer.poll();
if (item != null) {
writerBuffer.add("processed:" + item);
}
if (processorBuffer.isEmpty()) {
try {
processorBuffer = readerExchanger.exchange(processorBuffer);
writerBuffer = writerExchanger.exchange(writerBuffer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
};
Runnable writer = () -> {
Queue<String> writerBuffer = new ConcurrentLinkedQueue<>();
try {
writerBuffer = writerExchanger.exchange(writerBuffer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
while (true) {
String item = writerBuffer.poll();
if (item != null) {
System.out.println(item);
}
if (writerBuffer.isEmpty()) {
try {
writerBuffer = writerExchanger.exchange(writerBuffer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
};
CompletableFuture.allOf(
CompletableFuture.runAsync(reader),
CompletableFuture.runAsync(processor),
CompletableFuture.runAsync(writer)).join();
}
核心优势:零 GC 交换
✅ 为什么说“无 GC”?
- 每个线程持有一个缓冲区(如
ConcurrentLinkedQueue
) - 交换的是缓冲区对象本身,而不是里面的数据
- 交换后,原缓冲区被重用,旧对象不会被丢弃
- 避免了频繁创建新对象 → 减少 Young GC 次数
❌ 对比传统方式:
- 使用
BlockingQueue
传输数据,每条数据都要入队出队,可能创建 Node 对象 - 缓冲区本身也可能频繁创建销毁
📌 类比:这类似于 Disruptor 模式,但更轻量:
- Disruptor 支持多生产者/消费者
- Exchanger 仅限成对线程,但实现更简单,适合点对点场景
⚠️ 注意事项:
- 示例中使用了
while(true)
,实际应用中需添加优雅关闭逻辑 - 异常处理已简化,生产环境需更 robust
4. 总结
Exchanger 是一个简单粗暴但非常高效的线程间数据交换工具,适用于以下场景:
✅ 适用场景:
- 成对线程间批量数据交换
- 构建双缓冲(double-buffering)或流水线架构
- 追求极致性能,减少对象创建和 GC 压力
❌ 不适用场景:
- 多于两个线程需要交换数据
- 需要广播或一对多通信
- 数据交换频率极低(同步开销可能不值得)
📌 最佳实践:
- 配合对象池或缓冲区复用,发挥最大性能
- 使用
exchange(T, timeout)
避免无限等待 - 注意线程启动顺序,防止死锁
示例代码已上传至 GitHub:https://github.com/yourname/java-concurrency-examples