1. 概述
本文将深入探讨 RxJava 中处理 Observable 的实用工具操作符,并展示如何实现自定义操作符。
操作符本质是一个函数:它接收上游 Observable
2. Do 系列操作符
这些操作符用于在 Observable 生命周期事件中插入自定义行为。
2.1 核心操作符
- ✅ doOnNext:在 onNext 调用时执行指定动作
- ✅ doOnCompleted:当 Observable 正常终止(调用 onCompleted)时执行动作
Observable.range(1, 10)
.doOnNext(r -> receivedTotal += r)
.doOnCompleted(() -> result = "Completed")
.subscribe();
assertTrue(receivedTotal == 55);
assertTrue(result.equals("Completed"));
2.2 扩展操作符
- ⚠️ doOnEach:为每个发射项注册回调,拦截 onNext/onError/onCompleted
- ⚠️ doOnSubscribe:当观察者订阅时执行动作
- ⚠️ doOnUnsubscribe:取消订阅时执行动作(与 doOnSubscribe 相反)
Observable.range(1, 10)
.doOnEach(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Complete");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Integer value) {
receivedTotal += value;
}
})
.doOnSubscribe(() -> result = "Subscribed")
.subscribe();
assertTrue(receivedTotal == 55);
assertTrue(result.equals("Subscribed"));
2.3 错误处理操作符
- ❌ doOnError:当 Observable 因错误终止时执行动作
- ✅ doOnTerminate:无论成功或失败,终止时都执行动作
- ✅ doAfterTerminate:终止后执行动作(替代已废弃的 finallyDo)
thrown.expect(OnErrorNotImplementedException.class);
Observable.empty()
.single()
.doOnError(throwable -> { throw new RuntimeException("error");})
.doOnTerminate(() -> result += "doOnTerminate")
.doAfterTerminate(() -> result += "_doAfterTerminate")
.subscribe();
assertTrue(result.equals("doOnTerminate_doAfterTerminate"));
3. 线程控制:ObserveOn vs SubscribeOn
默认情况下,Observable 及其操作符链在调用 subscribe 的线程上运行。
3.1 ObserveOn 操作符
指定 Observable 用于发送通知的 Scheduler:
Observable.range(1, 5)
.map(i -> i * 100)
.doOnNext(i -> {
emittedTotal += i;
System.out.println("Emitting " + i
+ " on thread " + Thread.currentThread().getName());
})
.observeOn(Schedulers.computation())
.map(i -> i * 10)
.subscribe(i -> {
receivedTotal += i;
System.out.println("Received " + i + " on thread "
+ Thread.currentThread().getName());
});
Thread.sleep(2000);
assertTrue(emittedTotal == 1500);
assertTrue(receivedTotal == 15000);
关键点:
- 元素在 main 线程产生并传递到第一个 map
- observeOn 后续处理切换到 computation 线程
- ⚠️ 踩坑提示:下游处理速度可能超过上游,需注意背压问题
3.2 SubscribeOn 操作符
指定 Observable 自身运行的 Scheduler:
Observable.range(1, 5)
.map(i -> i * 100)
.doOnNext(i -> {
emittedTotal += i;
System.out.println("Emitting " + i
+ " on thread " + Thread.currentThread().getName());
})
.subscribeOn(Schedulers.computation())
.map(i -> i * 10)
.subscribe(i -> {
receivedTotal += i;
System.out.println("Received " + i + " on thread "
+ Thread.currentThread().getName());
});
Thread.sleep(2000);
assertTrue(emittedTotal == 1500);
assertTrue(receivedTotal == 15000);
核心区别: | 操作符 | 作用范围 | 使用数量 | 线程切换 | |--------|----------|----------|----------| | subscribeOn | 影响整个链 | 仅一个 | 指定源发射线程 | | observeOn | 影响下游 | 多个 | 可多次切换 |
4. Single 和 SingleOrDefault
4.1 Single 操作符
返回仅发射单个项的 Observable,源必须严格发射一个元素:
Observable.range(1, 1)
.single()
.subscribe(i -> receivedTotal += i);
assertTrue(receivedTotal == 1);
当源发射零个或多个元素时抛出异常:
Observable.empty()
.single()
.onErrorReturn(e -> receivedTotal += 10)
.subscribe();
assertTrue(receivedTotal == 10);
4.2 SingleOrDefault 操作符
类似 single,但允许指定默认值:
Observable.empty()
.singleOrDefault("Default")
.subscribe(i -> result +=i);
assertTrue(result.equals("Default"));
注意:当源发射多个元素时仍抛出异常:
Observable.range(1, 3)
.singleOrDefault(5)
.onErrorReturn(e -> receivedTotal += 10)
.subscribe();
assertTrue(receivedTotal == 10);
4.3 使用场景总结
- ✅ 源可能发射零或一个元素 → 用 singleOrDefault
- ✅ 需处理多个元素 → 用 first 或 last
5. Timestamp 操作符
简单粗暴:为每个发射项附加时间戳:
Observable.range(1, 10)
.timestamp()
.map(o -> result = o.getClass().toString() )
.last()
.subscribe();
assertTrue(result.equals("class rx.schedulers.Timestamped"));
6. Delay 操作符
延迟发射:在发射每个元素前暂停指定时间:
Observable source = Observable.interval(1, TimeUnit.SECONDS)
.take(5)
.timestamp();
Observable delayedObservable
= source.delay(2, TimeUnit.SECONDS);
source.subscribe(
value -> System.out.println("source :" + value),
t -> System.out.println("source error"),
() -> System.out.println("source completed"));
delayedObservable.subscribe(
value -> System.out.println("delay : " + value),
t -> System.out.println("delay error"),
() -> System.out.println("delay completed"));
Thread.sleep(8000);
替代方案:delaySubscription 延迟订阅而非发射,默认使用 computation 调度器,可自定义。
7. Repeat 操作符
拦截完成通知:收到上游完成信号后重新订阅,形成循环:
Observable.range(1, 3)
.repeat(3)
.subscribe(i -> receivedTotal += i);
assertTrue(receivedTotal == 18);
⚠️ 注意:当上游是动态流时,重复的事件序列可能不同。
8. Cache 操作符
智能缓存机制:
- 首次订阅时,委托给底层 Observable 并缓存所有通知
- 后续订阅直接使用缓存值,不再调用底层源
Observable<Integer> source =
Observable.<Integer>create(subscriber -> {
System.out.println("Create");
subscriber.onNext(receivedTotal += 5);
subscriber.onCompleted();
}).cache();
source.subscribe(i -> {
System.out.println("element 1");
receivedTotal += 1;
});
source.subscribe(i -> {
System.out.println("element 2");
receivedTotal += 2;
});
assertTrue(receivedTotal == 8);
9. Using 操作符
资源管理利器:自动管理资源生命周期:
- 订阅时创建资源
- 使用资源创建 Observable
- 取消订阅或终止时释放资源
Observable<Character> values = Observable.using(
() -> "resource",
r -> {
return Observable.create(o -> {
for (Character c : r.toCharArray()) {
o.onNext(c);
}
o.onCompleted();
});
},
r -> System.out.println("Disposed: " + r)
);
values.subscribe(
v -> result += v,
e -> result += e
);
assertTrue(result.equals("resource"));
10. 总结
RxJava 的真正威力在于其操作符系统。通过声明式转换数据流,我们获得了:
- ✅ 安全性
- ✅ 表达力
- ✅ 灵活性
掌握内置操作符是使用 RxJava 的关键。函数式编程基础决定了操作符在 RxJava 生态中的核心地位。
本文所有示例代码可在 GitHub 获取完整项目源码。