1. 引言
本文将深入探讨 RxJava 中组合 Observable 的多种方式。如果你刚接触 RxJava,建议先阅读这篇入门教程。
直接进入正题。
2. Observable 基础
Observable 序列(简称 Observable)是异步数据流的表示形式。它基于观察者模式,其中 Observer 对象订阅 Observable 发射的数据项。
这种订阅是非阻塞的,因为 Observer 只需响应 Observable 未来发射的数据,天然支持并发。RxJava 中的简单示例:
Observable
.from(new String[] { "John", "Doe" })
.subscribe(name -> System.out.println("Hello " + name))
3. 组合 Observable 的方法
在响应式编程中,组合多个 Observable 是常见需求。例如在 Web 应用中,可能需要同时获取两个独立的异步数据流:
✅ 最佳实践:无需等待前一个流完成再请求下一个,而是同时调用并订阅组合后的流。
下面介绍几种组合方式及其适用场景:
3.1. 合并操作 (merge)
使用 merge
操作符可将多个 Observable 的输出合并为一个流:
@Test
public void givenTwoObservables_whenMerged_shouldEmitCombinedResults() {
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
Observable.merge(
Observable.from(new String[] {"Hello", "World"}),
Observable.from(new String[] {"I love", "RxJava"})
).subscribe(testSubscriber);
testSubscriber.assertValues("Hello", "World", "I love", "RxJava");
}
3.2. 延迟错误合并 (mergeDelayError)
mergeDelayError
与 merge
类似,但遇到错误时会继续发射无错误的数据,最后再抛出异常:
@Test
public void givenMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError() {
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
Observable.mergeDelayError(
Observable.from(new String[] { "hello", "world" }),
Observable.error(new RuntimeException("Some exception")),
Observable.from(new String[] { "rxjava" })
).subscribe(testSubscriber);
testSubscriber.assertValues("hello", "world", "rxjava");
testSubscriber.assertError(RuntimeException.class);
}
输出结果:
hello
world
rxjava
⚠️ 关键区别:若改用 merge
,字符串 "rxjava" 不会发射,因为遇到错误会立即终止数据流。
3.3. 压缩操作 (zip)
zip
操作符将两个序列的值按配对组合:
@Test
public void givenTwoObservables_whenZipped_thenReturnCombinedResults() {
List<String> zippedStrings = new ArrayList<>();
Observable.zip(
Observable.from(new String[] { "Simple", "Moderate", "Complex" }),
Observable.from(new String[] { "Solutions", "Success", "Hierarchy"}),
(str1, str2) -> str1 + " " + str2).subscribe(zippedStrings::add);
assertThat(zippedStrings).isNotEmpty();
assertThat(zippedStrings.size()).isEqualTo(3);
assertThat(zippedStrings).contains("Simple Solutions", "Moderate Success", "Complex Hierarchy");
}
3.4. 与时间间隔压缩 (zip with interval)
将数据流与interval压缩,可实现延迟发射数据的效果:
@Test
public void givenAStream_whenZippedWithInterval_shouldDelayStreamEmmission() {
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
Observable<String> data = Observable.just("one", "two", "three", "four", "five");
Observable<Long> interval = Observable.interval(1L, TimeUnit.SECONDS);
Observable
.zip(data, interval, (strData, tick) -> String.format("[%d]=%s", tick, strData))
.toBlocking().subscribe(testSubscriber);
testSubscriber.assertCompleted();
testSubscriber.assertValueCount(5);
testSubscriber.assertValues("[0]=one", "[1]=two", "[2]=three", "[3]=four", "[4]=five");
}
4. 总结
本文介绍了 RxJava 中组合 Observable 的几种核心方法。其他高级操作符如:
可参考RxJava 官方文档深入学习。
本文源码见 GitHub 仓库。