1. 概述
在RxJava中,concat()
和merge()
是两个用于组合多个Observable
实例的强大操作符。
- ✅
concat()
:顺序发射每个Observable
的数据,必须等当前Observable
完成后才开始下一个 - ✅
merge()
:并发混合所有Observable
的数据,按实际产生顺序发射
本文将深入探讨这两种操作符在同步/异步数据源场景下的行为差异,通过代码示例和时序图直观对比它们的执行特点。
2. 同步数据源场景
当所有数据源都是同步时,concat()
和merge()
的行为完全一致。下面通过具体场景验证。
2.1 场景搭建
使用Observable.just()
创建三个同步数据源:
Observable<Integer> observable1 = Observable.just(1, 2, 3);
Observable<Integer> observable2 = Observable.just(4, 5, 6);
Observable<Integer> observable3 = Observable.just(7, 8, 9);
创建两个测试订阅者:
TestSubscriber<Integer> testSubscriberForConcat = new TestSubscriber<>();
TestSubscriber<Integer> testSubscriberForMerge = new TestSubscriber<>();
2.2 操作符对比
**使用concat()
**:
Observable.concat(observable1, observable2, observable3)
.subscribe(testSubscriberForConcat);
验证发射顺序(严格按数据源顺序):
testSubscriberForConcat.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9);
**使用merge()
**:
Observable.merge(observable1, observable2, observable3).subscribe(testSubscriberForMerge);
验证结果(顺序与concat相同):
testSubscriberForMerge.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9);
💡 关键结论:同步数据源会立即发射所有数据并完成,导致两个操作符产生相同输出。但实际开发中仍需注意:
- 需要严格顺序 → 用
concat()
- 允许混合发射 → 用
merge()
3. 可预测异步数据源
当数据源异步且发射间隔不同时,两个操作符的行为差异开始显现。
3.1 场景搭建
创建两个异步数据源:
// 每100ms发射一个,共3个
Observable<Integer> observable1 = Observable.interval(100, TimeUnit.MILLISECONDS)
.map(i -> i.intValue() + 1)
.take(3);
// 每30ms发射一个,共7个
Observable<Integer> observable2 = Observable.interval(30, TimeUnit.MILLISECONDS)
.map(i -> i.intValue() + 4)
.take(7);
测试订阅者:
TestSubscriber<Integer> testSubscriberForConcat = new TestSubscriber<>();
TestSubscriber<Integer> testSubscriberForMerge = new TestSubscriber<>();
3.2 操作符对比
concat()
行为:
Observable.concat(observable1, observable2)
.subscribe(testSubscriberForConcat);
testSubscriberForConcat.awaitTerminalEvent(); // 等待所有发射完成
验证结果(先完成observable1再开始observable2):
testSubscriber.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
merge()
行为:
Observable.merge(observable1, observable2)
.subscribe(testSubscriberForMerge);
testSubscriberForMerge.awaitTerminalEvent();
验证结果(按实际发射时间混合):
testSubscriberForMerge.assertValues(4, 5, 6, 1, 7, 8, 9, 2, 10, 3);
⚠️ 核心差异:
merge()
会按实际发射时间混合数据,而concat()
强制数据源级顺序。
4. 存在竞争条件的异步数据源
当多个数据源以相同间隔异步发射时,结果顺序变得不可预测。
4.1 场景搭建
创建两个间隔相同的异步数据源:
// 均为100ms间隔,共3个数据
Observable<Integer> observable1 = Observable.interval(100, TimeUnit.MILLISECONDS)
.map(i -> i.intValue() + 1)
.take(3);
Observable<Integer> observable2 = Observable.interval(100, TimeUnit.MILLISECONDS)
.map(i -> i.intValue() + 4)
.take(3);
测试订阅者:
TestSubscriber<Integer> testSubscriberForConcat = new TestSubscriber<>();
TestSubscriber<Integer> testSubscriberForMerge = new TestSubscriber<>();
4.2 操作符对比
concat()
行为(顺序保持不变):
Observable.concat(observable1, observable2)
.subscribe(testSubscriberForConcat);
testSubscriberForConcat.awaitTerminalEvent();
// 验证严格顺序
testSubscriberForConcat.assertValues(1, 2, 3, 4, 5, 6);
merge()
行为(顺序不可预测):
Observable.merge(observable1, observable2)
.subscribe(testSubscriberForMerge);
testSubscriberForMerge.awaitTerminalEvent();
// 验证数据完整性(不验证顺序)
List<Integer> actual = testSubscriberForMerge.getOnNextEvents();
List<Integer> expected = Arrays.asList(1, 2, 3, 4, 5, 6);
assertTrue(actual.containsAll(expected) && expected.containsAll(actual));
// 实际输出示例(每次运行可能不同)
// 21:05:43.252 [main] INFO actual emissions: [4, 1, 2, 5, 3, 6]
❌ 踩坑警告:在竞争条件下,
merge()
的输出顺序无法预测!如果业务依赖顺序,必须改用concat()
。
5. 总结
通过三种典型场景的对比,我们清晰看到concat()
和merge()
的核心差异:
场景 | concat() 行为 |
merge() 行为 |
---|---|---|
同步数据源 | 严格顺序(与merge相同) | 严格顺序(与concat相同) |
可预测异步源 | 数据源级顺序(先1后2) | 按实际发射时间混合 |
竞争条件异步源 | 数据源级顺序(稳定可预测) | 无序混合(每次运行可能不同) |
选择建议:
- ✅ 需要保证数据源级顺序 →
concat()
- ✅ 允许按时间混合发射 →
merge()
- ❌ 在竞争条件下依赖顺序 → 避免使用
merge()
完整示例代码已上传至 GitHub仓库,建议实际运行体验差异。