1. 概述

在RxJava中,concat()merge()是两个用于组合多个Observable实例的强大操作符。

  • concat()顺序发射每个Observable的数据,必须等当前Observable完成后才开始下一个
  • merge()并发混合所有Observable的数据,按实际产生顺序发射

本文将深入探讨这两种操作符在同步/异步数据源场景下的行为差异,通过代码示例和时序图直观对比它们的执行特点。

2. 同步数据源场景

当所有数据源都是同步时,concat()merge()的行为完全一致。下面通过具体场景验证。

2.1 场景搭建

使用Observable.just()创建三个同步数据源:

Observable<Integer> observable1 = Observable.just(1, 2, 3);
Observable<Integer> observable2 = Observable.just(4, 5, 6);
Observable<Integer> observable3 = Observable.just(7, 8, 9);

创建两个测试订阅者:

TestSubscriber<Integer> testSubscriberForConcat = new TestSubscriber<>();
TestSubscriber<Integer> testSubscriberForMerge = new TestSubscriber<>();

2.2 操作符对比

**使用concat()**:

Observable.concat(observable1, observable2, observable3)
  .subscribe(testSubscriberForConcat);

验证发射顺序(严格按数据源顺序):

testSubscriberForConcat.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9);

concat()同步执行时序图

**使用merge()**:

Observable.merge(observable1, observable2, observable3).subscribe(testSubscriberForMerge);

验证结果(顺序与concat相同):

testSubscriberForMerge.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9);

merge()同步执行时序图

💡 关键结论:同步数据源会立即发射所有数据并完成,导致两个操作符产生相同输出。但实际开发中仍需注意:

  • 需要严格顺序 → 用concat()
  • 允许混合发射 → 用merge()

3. 可预测异步数据源

当数据源异步且发射间隔不同时,两个操作符的行为差异开始显现。

3.1 场景搭建

创建两个异步数据源:

// 每100ms发射一个,共3个
Observable<Integer> observable1 = Observable.interval(100, TimeUnit.MILLISECONDS)
  .map(i -> i.intValue() + 1)
  .take(3);

// 每30ms发射一个,共7个  
Observable<Integer> observable2 = Observable.interval(30, TimeUnit.MILLISECONDS)
  .map(i -> i.intValue() + 4)
  .take(7);

测试订阅者:

TestSubscriber<Integer> testSubscriberForConcat = new TestSubscriber<>();
TestSubscriber<Integer> testSubscriberForMerge = new TestSubscriber<>();

3.2 操作符对比

concat()行为

Observable.concat(observable1, observable2)
  .subscribe(testSubscriberForConcat);
testSubscriberForConcat.awaitTerminalEvent(); // 等待所有发射完成

验证结果(先完成observable1再开始observable2):

testSubscriber.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

concat()异步顺序执行时序图

merge()行为

Observable.merge(observable1, observable2)
  .subscribe(testSubscriberForMerge);
testSubscriberForMerge.awaitTerminalEvent();

验证结果(按实际发射时间混合):

testSubscriberForMerge.assertValues(4, 5, 6, 1, 7, 8, 9, 2, 10, 3);

merge()异步混合发射时序图

⚠️ 核心差异merge()会按实际发射时间混合数据,而concat()强制数据源级顺序

4. 存在竞争条件的异步数据源

当多个数据源以相同间隔异步发射时,结果顺序变得不可预测。

4.1 场景搭建

创建两个间隔相同的异步数据源:

// 均为100ms间隔,共3个数据
Observable<Integer> observable1 = Observable.interval(100, TimeUnit.MILLISECONDS)
  .map(i -> i.intValue() + 1)
  .take(3);

Observable<Integer> observable2 = Observable.interval(100, TimeUnit.MILLISECONDS)
  .map(i -> i.intValue() + 4)
  .take(3);

测试订阅者:

TestSubscriber<Integer> testSubscriberForConcat = new TestSubscriber<>();
TestSubscriber<Integer> testSubscriberForMerge = new TestSubscriber<>();

4.2 操作符对比

concat()行为(顺序保持不变):

Observable.concat(observable1, observable2)
  .subscribe(testSubscriberForConcat);
testSubscriberForConcat.awaitTerminalEvent();

// 验证严格顺序
testSubscriberForConcat.assertValues(1, 2, 3, 4, 5, 6);

concat()竞争条件下的顺序执行

merge()行为(顺序不可预测):

Observable.merge(observable1, observable2)
  .subscribe(testSubscriberForMerge);
testSubscriberForMerge.awaitTerminalEvent();

// 验证数据完整性(不验证顺序)
List<Integer> actual = testSubscriberForMerge.getOnNextEvents();
List<Integer> expected = Arrays.asList(1, 2, 3, 4, 5, 6);
assertTrue(actual.containsAll(expected) && expected.containsAll(actual));

// 实际输出示例(每次运行可能不同)
// 21:05:43.252 [main] INFO actual emissions: [4, 1, 2, 5, 3, 6]

merge()竞争条件下的混合发射

踩坑警告:在竞争条件下,merge()的输出顺序无法预测!如果业务依赖顺序,必须改用concat()

5. 总结

通过三种典型场景的对比,我们清晰看到concat()merge()的核心差异:

场景 concat() 行为 merge() 行为
同步数据源 严格顺序(与merge相同) 严格顺序(与concat相同)
可预测异步源 数据源级顺序(先1后2) 按实际发射时间混合
竞争条件异步源 数据源级顺序(稳定可预测) 无序混合(每次运行可能不同)

选择建议

  • ✅ 需要保证数据源级顺序 → concat()
  • ✅ 允许按时间混合发射 → merge()
  • ❌ 在竞争条件下依赖顺序 → 避免使用merge()

完整示例代码已上传至 GitHub仓库,建议实际运行体验差异。


原始标题:concat() vs. merge() Operators in RxJava Observables | Baeldung