1. 概述
本教程我们将学习 如何使用 RxJava2 的操作符将传统的同步和异步 API 转换为 Observable。
我们会编写几个简单的函数,帮助我们深入理解这些操作符的使用方式。
2. Maven 依赖
首先,我们需要引入 RxJava2 和 RxJava2Extensions 依赖:
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>com.github.akarnokd</groupId>
<artifactId>rxjava2-extensions</artifactId>
<version>0.20.4</version>
</dependency>
3. 操作符介绍
RxJava2 提供了 大量操作符 来应对响应式编程的各种场景。
不过我们只会重点介绍一些 将同步或异步方法转换为 Observable 的常用操作符。这些操作符通常接收一个函数作为参数,并将函数返回值发射出去。
此外,RxJava2 还提供了一些扩展操作符用于增强功能。
接下来我们看看如何使用这些操作符来转换同步和异步方法。
4. 同步方法转换
4.1. 使用 fromCallable()
该操作符会返回一个 Observable,当有订阅者订阅时,会调用传入的函数,并将函数返回值发射出去。
我们先定义一个返回整数的函数:
AtomicInteger counter = new AtomicInteger();
Callable<Integer> callable = () -> counter.incrementAndGet();
接着将其转换为 Observable 并订阅测试:
Observable<Integer> source = Observable.fromCallable(callable);
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(i);
assertEquals(i, counter.get());
}
✅ fromCallable()
是懒执行的,每次订阅都会重新调用函数。我们通过循环创建多个订阅者来验证这一点。
由于响应式流默认是异步的,订阅者会立即返回。在实际场景中,函数执行可能有延迟,所以我们设置了 最多等待 5 秒 来获取结果。
📌 我们使用了 Observable.test()
方法来测试。这个方法非常实用,会创建一个 TestObserver
并自动订阅。
4.2. 使用 start()
start()
是 RxJava2Extensions 模块中的一个操作符。它会 异步调用传入的函数,并返回一个发射结果的 Observable:
Observable<Integer> source = AsyncObservable.start(callable);
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
assertEquals(1, counter.get());
}
⚠️ 注意:函数是立即执行的,而不是在订阅时才执行。多个订阅者会观察到相同的返回值。
5. 异步方法转换
5.1. 使用 fromFuture()
我们知道,在 Java 中实现异步最常见的方法是使用 Future
。fromFuture()
接收一个 Future
,并通过 Future.get()
获取结果并发射出去。
先将之前的函数异步化:
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(callable);
然后测试转换结果:
Observable<Integer> source = Observable.fromFuture(future);
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
assertEquals(1, counter.get());
}
executor.shutdown();
⚠️ 同样地,每个订阅者都会观察到相同的返回值。
💡 注意:Observable.dispose()
用于防止内存泄漏,但对 Future.get()
的阻塞调用无效。我们可以通过组合 doOnDispose()
和 future.cancel(true)
来确保取消 Future:
source.doOnDispose(() -> future.cancel(true));
5.2. 使用 startFuture()
顾名思义,这个操作符会 立即启动传入的 Future,并在订阅时发射返回值。与 fromFuture()
缓存结果不同,每次订阅都会重新执行异步方法:
ExecutorService executor = Executors.newSingleThreadExecutor();
Observable<Integer> source = AsyncObservable.startFuture(() -> executor.submit(callable));
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(i);
assertEquals(i, counter.get());
}
executor.shutdown();
✅ 每次订阅都重新执行,返回值递增,符合预期。
5.3. 使用 deferFuture()
这个操作符用于处理返回 Observable 的异步方法。它会 聚合多个 Observable 并返回一个发射多个值的流。每次订阅都会重新执行传入的工厂方法。
我们先创建一个异步工厂函数:
List<Integer> list = Arrays.asList(new Integer[] { counter.incrementAndGet(),
counter.incrementAndGet(), counter.incrementAndGet() });
ExecutorService exec = Executors.newSingleThreadExecutor();
Callable<Observable<Integer>> callable = () -> Observable.fromIterable(list);
然后测试:
Observable<Integer> source = AsyncObservable.deferFuture(() -> exec.submit(callable));
for (int i = 1; i < 4; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1,2,3);
}
exec.shutdown();
✅ 每次订阅都会发射 1,2,3
,说明工厂函数被重新调用了。
6. 总结
在这篇教程中,我们学习了如何使用 RxJava2 将同步和异步方法转换为 Observable。
虽然示例都比较简单,但 RxJava2 的能力远不止于此。你可以将其用于更复杂的场景,比如视频流处理、分批数据传输等。
一如既往,本文中的所有示例代码都可以在 GitHub 项目 中找到。