1. 概述

本教程将介绍 RxJava 中的 Hooks(钩子)机制,并通过简短的示例演示不同场景下这些钩子的使用方式。

2. 什么是 RxJava Hooks?

顾名思义,RxJava Hooks 允许我们在 Observable、Completable、Maybe、Flowable 和 Single 的生命周期中插入自定义逻辑。此外,RxJava 还允许我们为 Schedulers 返回的调度器添加生命周期钩子,甚至可以设置一个全局的错误处理器。

在 RxJava 1 中,使用的是 RxJavaHooks 类来定义钩子。但在 RxJava 2 中,钩子机制被完全重写,**RxJavaHooks 已不再可用,取而代之的是 RxJavaPlugins**。

RxJavaPlugins 提供了多个 setter 方法用于设置各种钩子。这些钩子是全局的,一旦设置,除非调用 RxJavaPlugins.reset() 方法或重新设置特定的钩子,否则它们将一直生效。

3. 错误处理钩子

我们可以使用 setErrorHandler() 方法来处理那些由于下游生命周期已经终止而无法被正常发出的异常。

示例代码如下:

RxJavaPlugins.setErrorHandler(throwable -> {
    hookCalled = true;
});

Observable.error(new IllegalStateException()).subscribe();

assertTrue(hookCalled);

并非所有异常都会原样抛出。RxJava 会判断该异常是否属于“已知的 bug 类型”,如果是,则原样抛出;否则,会将其封装为 UndeliverableException。以下几种异常会被视为 bug 类型:

  • OnErrorNotImplementedException:用户未在 subscribe() 中添加 onError 回调
  • MissingBackpressureException:背压问题导致的异常
  • IllegalStateException:协议违规
  • NullPointerException:空指针异常
  • IllegalArgumentException:非法参数
  • CompositeException:在处理异常时再次发生异常

踩坑提示:不要忽略全局错误钩子,否则未捕获的异常可能会导致应用崩溃。

4. Completable 的钩子

RxJava 的 Completable 提供了两个生命周期钩子。

4.1. setOnCompletableAssembly

当 RxJava 实例化 Completable 的操作符或数据源时,会调用该钩子。你可以在这个钩子中对当前的 Completable 对象进行任意操作:

RxJavaPlugins.setOnCompletableAssembly(completable -> {
    hookCalled = true;
    return completable;
});

Completable.fromSingle(Single.just(1));

assertTrue(hookCalled);

4.2. setOnCompletableSubscribe

当有订阅者订阅 Completable 之前,会触发该钩子:

RxJavaPlugins.setOnCompletableSubscribe((completable, observer) -> {
    hookCalled = true;
    return observer;
});

Completable.fromSingle(Single.just(1)).test();

assertTrue(hookCalled);

5. Observable 的钩子

RxJava 为 Observable 提供了三个生命周期钩子。

5.1. setOnObservableAssembly

在实例化 Observable 的操作符或数据源时触发:

RxJavaPlugins.setOnObservableAssembly(observable -> {
    hookCalled = true;
    return observable;
});

Observable.range(1, 10);

assertTrue(hookCalled);

5.2. setOnObservableSubscribe

在订阅者订阅 Observable 之前触发:

RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> {
    hookCalled = true;
    return observer;
});

Observable.range(1, 10).test();

assertTrue(hookCalled);

5.3. setOnConnectableObservableAssembly

该钩子用于 ConnectableObservable,它在 connect() 被调用时才会开始发射数据:

RxJavaPlugins.setOnConnectableObservableAssembly(connectableObservable -> {
    hookCalled = true;
    return connectableObservable;
});

ConnectableObservable.range(1, 10).publish().connect();

assertTrue(hookCalled);

6. Flowable 的钩子

以下是 Flowable 提供的生命周期钩子。

6.1. setOnFlowableAssembly

在实例化 Flowable 的操作符或数据源时触发:

RxJavaPlugins.setOnFlowableAssembly(flowable -> {
    hookCalled = true;
    return flowable;
});

Flowable.range(1, 10);

assertTrue(hookCalled);

6.2. setOnFlowableSubscribe

在订阅者订阅 Flowable 之前触发:

RxJavaPlugins.setOnFlowableSubscribe((flowable, observer) -> {
    hookCalled = true;
    return observer;
});

Flowable.range(1, 10).test();

assertTrue(hookCalled);

6.3. setOnConnectableFlowableAssembly

用于 ConnectableFlowable,在 connect() 被调用时触发:

RxJavaPlugins.setOnConnectableFlowableAssembly(connectableFlowable -> {
    hookCalled = true;
    return connectableFlowable;
});

ConnectableFlowable.range(1, 10).publish().connect();

assertTrue(hookCalled);

6.4. setOnParallelAssembly

用于 ParallelFlowable,实现多个发布者的并行处理:

RxJavaPlugins.setOnParallelAssembly(parallelFlowable -> {
    hookCalled = true;
    return parallelFlowable;
});

Flowable.range(1, 10).parallel();

assertTrue(hookCalled);

7. Maybe 的钩子

Maybe 提供了两个生命周期钩子。

7.1. setOnMaybeAssembly

在实例化 Maybe 时触发:

RxJavaPlugins.setOnMaybeAssembly(maybe -> {
    hookCalled = true;
    return maybe;
});

Maybe.just(1);

assertTrue(hookCalled);

7.2. setOnMaybeSubscribe

在订阅者订阅 Maybe 之前触发:

RxJavaPlugins.setOnMaybeSubscribe((maybe, observer) -> {
    hookCalled = true;
    return observer;
});

Maybe.just(1).test();

assertTrue(hookCalled);

8. Single 的钩子

Single 同样提供了两个基本钩子。

8.1. setOnSingleAssembly

在实例化 Single 时触发:

RxJavaPlugins.setOnSingleAssembly(single -> {
    hookCalled = true;
    return single;
});

Single.just(1);

assertTrue(hookCalled);

8.2. setOnSingleSubscribe

在订阅者订阅 Single 之前触发:

RxJavaPlugins.setOnSingleSubscribe((single, observer) -> {
    hookCalled = true;
    return observer;
});

Single.just(1).test();

assertTrue(hookCalled);

9. Schedulers 的钩子

除了数据流的钩子,RxJava 的 Schedulers 也提供了多个钩子用于控制调度器的行为。

9.1. setScheduleHandler

无论使用哪种调度器,在调度任务时都会触发该钩子:

RxJavaPlugins.setScheduleHandler((runnable) -> {
    hookCalled = true;
    return runnable;
});

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.single())
  .test();

hookCalled = false;

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.computation())
  .test();

assertTrue(hookCalled);

9.2. Computation 调度器钩子

提供两个钩子:setInitComputationSchedulerHandlersetComputationSchedulerHandler

RxJavaPlugins.setInitComputationSchedulerHandler((scheduler) -> {
    initHookCalled = true;
    return scheduler.call();
});

RxJavaPlugins.setComputationSchedulerHandler((scheduler) -> {
    hookCalled = true;
    return scheduler;
});

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.computation())
  .test();

assertTrue(hookCalled && initHookCalled);

9.3. IO 调度器钩子

同样提供两个钩子:setInitIoSchedulerHandlersetIoSchedulerHandler

RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> {
    initHookCalled = true;
    return scheduler.call();
});

RxJavaPlugins.setIoSchedulerHandler((scheduler) -> {
    hookCalled = true;
    return scheduler;
});

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.io())
  .test();

assertTrue(hookCalled && initHookCalled);

9.4. Single 调度器钩子

RxJavaPlugins.setInitSingleSchedulerHandler((scheduler) -> {
    initHookCalled = true;
    return scheduler.call();
});

RxJavaPlugins.setSingleSchedulerHandler((scheduler) -> {
    hookCalled = true;
    return scheduler;
});

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.single())
  .test();

assertTrue(hookCalled && initHookCalled);

9.5. NewThread 调度器钩子

RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> {
    initHookCalled = true;
    return scheduler.call();
});

RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> {
    hookCalled = true;
    return scheduler;
});

Observable.range(1, 15)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.newThread())
  .test();

assertTrue(hookCalled && initHookCalled);

10. 总结

本文介绍了 RxJava 中的各类生命周期钩子,以及如何使用它们。其中,错误处理钩子最为重要,其他钩子可用于审计、日志记录等场景。

如需查看完整示例代码,请访问 GitHub 仓库


原始标题:RxJava Hooks