1. 概述
本教程将介绍 RxJava 中的 Hooks(钩子)机制,并通过简短的示例演示不同场景下这些钩子的使用方式。
2. 什么是 RxJava Hooks?
顾名思义,RxJava Hooks 允许我们在 Observable、Completable、Maybe、Flowable 和 Single 的生命周期中插入自定义逻辑。此外,RxJava 还允许我们为 Schedulers
返回的调度器添加生命周期钩子,甚至可以设置一个全局的错误处理器。
在 RxJava 1 中,使用的是 RxJavaHooks
类来定义钩子。但在 RxJava 2 中,钩子机制被完全重写,**RxJavaHooks
已不再可用,取而代之的是 RxJavaPlugins
**。
RxJavaPlugins
提供了多个 setter 方法用于设置各种钩子。这些钩子是全局的,一旦设置,除非调用 RxJavaPlugins.reset()
方法或重新设置特定的钩子,否则它们将一直生效。
3. 错误处理钩子
我们可以使用 setErrorHandler()
方法来处理那些由于下游生命周期已经终止而无法被正常发出的异常。
示例代码如下:
RxJavaPlugins.setErrorHandler(throwable -> {
hookCalled = true;
});
Observable.error(new IllegalStateException()).subscribe();
assertTrue(hookCalled);
并非所有异常都会原样抛出。RxJava 会判断该异常是否属于“已知的 bug 类型”,如果是,则原样抛出;否则,会将其封装为 UndeliverableException
。以下几种异常会被视为 bug 类型:
OnErrorNotImplementedException
:用户未在subscribe()
中添加onError
回调MissingBackpressureException
:背压问题导致的异常IllegalStateException
:协议违规NullPointerException
:空指针异常IllegalArgumentException
:非法参数CompositeException
:在处理异常时再次发生异常
✅ 踩坑提示:不要忽略全局错误钩子,否则未捕获的异常可能会导致应用崩溃。
4. Completable 的钩子
RxJava 的 Completable
提供了两个生命周期钩子。
4.1. setOnCompletableAssembly
当 RxJava 实例化 Completable
的操作符或数据源时,会调用该钩子。你可以在这个钩子中对当前的 Completable
对象进行任意操作:
RxJavaPlugins.setOnCompletableAssembly(completable -> {
hookCalled = true;
return completable;
});
Completable.fromSingle(Single.just(1));
assertTrue(hookCalled);
4.2. setOnCompletableSubscribe
当有订阅者订阅 Completable
之前,会触发该钩子:
RxJavaPlugins.setOnCompletableSubscribe((completable, observer) -> {
hookCalled = true;
return observer;
});
Completable.fromSingle(Single.just(1)).test();
assertTrue(hookCalled);
5. Observable 的钩子
RxJava 为 Observable
提供了三个生命周期钩子。
5.1. setOnObservableAssembly
在实例化 Observable
的操作符或数据源时触发:
RxJavaPlugins.setOnObservableAssembly(observable -> {
hookCalled = true;
return observable;
});
Observable.range(1, 10);
assertTrue(hookCalled);
5.2. setOnObservableSubscribe
在订阅者订阅 Observable
之前触发:
RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> {
hookCalled = true;
return observer;
});
Observable.range(1, 10).test();
assertTrue(hookCalled);
5.3. setOnConnectableObservableAssembly
该钩子用于 ConnectableObservable
,它在 connect()
被调用时才会开始发射数据:
RxJavaPlugins.setOnConnectableObservableAssembly(connectableObservable -> {
hookCalled = true;
return connectableObservable;
});
ConnectableObservable.range(1, 10).publish().connect();
assertTrue(hookCalled);
6. Flowable 的钩子
以下是 Flowable
提供的生命周期钩子。
6.1. setOnFlowableAssembly
在实例化 Flowable
的操作符或数据源时触发:
RxJavaPlugins.setOnFlowableAssembly(flowable -> {
hookCalled = true;
return flowable;
});
Flowable.range(1, 10);
assertTrue(hookCalled);
6.2. setOnFlowableSubscribe
在订阅者订阅 Flowable
之前触发:
RxJavaPlugins.setOnFlowableSubscribe((flowable, observer) -> {
hookCalled = true;
return observer;
});
Flowable.range(1, 10).test();
assertTrue(hookCalled);
6.3. setOnConnectableFlowableAssembly
用于 ConnectableFlowable
,在 connect()
被调用时触发:
RxJavaPlugins.setOnConnectableFlowableAssembly(connectableFlowable -> {
hookCalled = true;
return connectableFlowable;
});
ConnectableFlowable.range(1, 10).publish().connect();
assertTrue(hookCalled);
6.4. setOnParallelAssembly
用于 ParallelFlowable
,实现多个发布者的并行处理:
RxJavaPlugins.setOnParallelAssembly(parallelFlowable -> {
hookCalled = true;
return parallelFlowable;
});
Flowable.range(1, 10).parallel();
assertTrue(hookCalled);
7. Maybe 的钩子
Maybe
提供了两个生命周期钩子。
7.1. setOnMaybeAssembly
在实例化 Maybe
时触发:
RxJavaPlugins.setOnMaybeAssembly(maybe -> {
hookCalled = true;
return maybe;
});
Maybe.just(1);
assertTrue(hookCalled);
7.2. setOnMaybeSubscribe
在订阅者订阅 Maybe
之前触发:
RxJavaPlugins.setOnMaybeSubscribe((maybe, observer) -> {
hookCalled = true;
return observer;
});
Maybe.just(1).test();
assertTrue(hookCalled);
8. Single 的钩子
Single
同样提供了两个基本钩子。
8.1. setOnSingleAssembly
在实例化 Single
时触发:
RxJavaPlugins.setOnSingleAssembly(single -> {
hookCalled = true;
return single;
});
Single.just(1);
assertTrue(hookCalled);
8.2. setOnSingleSubscribe
在订阅者订阅 Single
之前触发:
RxJavaPlugins.setOnSingleSubscribe((single, observer) -> {
hookCalled = true;
return observer;
});
Single.just(1).test();
assertTrue(hookCalled);
9. Schedulers 的钩子
除了数据流的钩子,RxJava 的 Schedulers
也提供了多个钩子用于控制调度器的行为。
9.1. setScheduleHandler
无论使用哪种调度器,在调度任务时都会触发该钩子:
RxJavaPlugins.setScheduleHandler((runnable) -> {
hookCalled = true;
return runnable;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.single())
.test();
hookCalled = false;
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.computation())
.test();
assertTrue(hookCalled);
9.2. Computation 调度器钩子
提供两个钩子:setInitComputationSchedulerHandler
和 setComputationSchedulerHandler
:
RxJavaPlugins.setInitComputationSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setComputationSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.computation())
.test();
assertTrue(hookCalled && initHookCalled);
9.3. IO 调度器钩子
同样提供两个钩子:setInitIoSchedulerHandler
和 setIoSchedulerHandler
:
RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setIoSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.io())
.test();
assertTrue(hookCalled && initHookCalled);
9.4. Single 调度器钩子
RxJavaPlugins.setInitSingleSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setSingleSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.single())
.test();
assertTrue(hookCalled && initHookCalled);
9.5. NewThread 调度器钩子
RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 15)
.map(v -> v * 2)
.subscribeOn(Schedulers.newThread())
.test();
assertTrue(hookCalled && initHookCalled);
10. 总结
本文介绍了 RxJava 中的各类生命周期钩子,以及如何使用它们。其中,错误处理钩子最为重要,其他钩子可用于审计、日志记录等场景。
如需查看完整示例代码,请访问 GitHub 仓库。