1. 引言

本文将深入探讨 RxJava 中组合 Observable 的多种方式。如果你刚接触 RxJava,建议先阅读这篇入门教程

直接进入正题。

2. Observable 基础

Observable 序列(简称 Observable)是异步数据流的表示形式。它基于观察者模式,其中 Observer 对象订阅 Observable 发射的数据项。

这种订阅是非阻塞的,因为 Observer 只需响应 Observable 未来发射的数据,天然支持并发。RxJava 中的简单示例:

Observable
  .from(new String[] { "John", "Doe" })
  .subscribe(name -> System.out.println("Hello " + name))

3. 组合 Observable 的方法

在响应式编程中,组合多个 Observable 是常见需求。例如在 Web 应用中,可能需要同时获取两个独立的异步数据流:

最佳实践:无需等待前一个流完成再请求下一个,而是同时调用并订阅组合后的流。

下面介绍几种组合方式及其适用场景:

3.1. 合并操作 (merge)

使用 merge 操作符可将多个 Observable 的输出合并为一个流:

@Test
public void givenTwoObservables_whenMerged_shouldEmitCombinedResults() {
    TestSubscriber<String> testSubscriber = new TestSubscriber<>();

    Observable.merge(
      Observable.from(new String[] {"Hello", "World"}),
      Observable.from(new String[] {"I love", "RxJava"})
    ).subscribe(testSubscriber);

    testSubscriber.assertValues("Hello", "World", "I love", "RxJava");
}

3.2. 延迟错误合并 (mergeDelayError)

mergeDelayErrormerge 类似,但遇到错误时会继续发射无错误的数据,最后再抛出异常

@Test
public void givenMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError() {
    TestSubscriber<String> testSubscriber = new TestSubscriber<>();
        
    Observable.mergeDelayError(
      Observable.from(new String[] { "hello", "world" }),
      Observable.error(new RuntimeException("Some exception")),
      Observable.from(new String[] { "rxjava" })
    ).subscribe(testSubscriber);

    testSubscriber.assertValues("hello", "world", "rxjava");
    testSubscriber.assertError(RuntimeException.class);
}

输出结果:

hello
world
rxjava

⚠️ 关键区别:若改用 merge,字符串 "rxjava" 不会发射,因为遇到错误会立即终止数据流。

3.3. 压缩操作 (zip)

zip 操作符将两个序列的值按配对组合

@Test
public void givenTwoObservables_whenZipped_thenReturnCombinedResults() {
    List<String> zippedStrings = new ArrayList<>();

    Observable.zip(
      Observable.from(new String[] { "Simple", "Moderate", "Complex" }), 
      Observable.from(new String[] { "Solutions", "Success", "Hierarchy"}),
      (str1, str2) -> str1 + " " + str2).subscribe(zippedStrings::add);
        
    assertThat(zippedStrings).isNotEmpty();
    assertThat(zippedStrings.size()).isEqualTo(3);
    assertThat(zippedStrings).contains("Simple Solutions", "Moderate Success", "Complex Hierarchy");
}

3.4. 与时间间隔压缩 (zip with interval)

将数据流与interval压缩,可实现延迟发射数据的效果:

@Test
public void givenAStream_whenZippedWithInterval_shouldDelayStreamEmmission() {
    TestSubscriber<String> testSubscriber = new TestSubscriber<>();
        
    Observable<String> data = Observable.just("one", "two", "three", "four", "five");
    Observable<Long> interval = Observable.interval(1L, TimeUnit.SECONDS);
        
    Observable
      .zip(data, interval, (strData, tick) -> String.format("[%d]=%s", tick, strData))
      .toBlocking().subscribe(testSubscriber);
        
    testSubscriber.assertCompleted();
    testSubscriber.assertValueCount(5);
    testSubscriber.assertValues("[0]=one", "[1]=two", "[2]=three", "[3]=four", "[4]=five");
}

4. 总结

本文介绍了 RxJava 中组合 Observable 的几种核心方法。其他高级操作符如:

可参考RxJava 官方文档深入学习。

本文源码见 GitHub 仓库


原始标题:Combining Observables in RxJava