1. 简介

在本教程中,我们将深入探讨 RxJava 中的 Completable 类型。它表示一个没有实际数据值的异步计算结果,通常用于只关心操作是否成功完成的场景。

2. 添加 RxJava 依赖

在 Maven 项目中添加 RxJava 2 的依赖:

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.2</version>
</dependency>

最新版本可从 Maven Central 获取。

3. Completable 类型解析

Completable 和 Observable 类似,但它不发射任何数据项,只发射完成(onComplete)或错误(onError)信号。

创建一个立即完成的 Completable 实例非常简单:

Completable
  .complete()
  .subscribe(new DisposableCompletableObserver() {
    @Override
    public void onComplete() {
        System.out.println("Completed!");
    }

    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }
});

我们也可以通过 CallableActionRunnable 创建 Completable:

Completable.fromRunnable(() -> {});

此外,可以从其他响应式类型转换而来:

Flowable<String> flowable = Flowable
  .just("request received", "user logged in");
Completable flowableCompletable = Completable
  .fromPublisher(flowable);
Completable singleCompletable = Single.just(1)
  .ignoreElement();

4. 链式调用 Completable

在一些业务逻辑中,我们可能需要按顺序执行多个 Completable 操作。例如:

  • 更新远程对象后更新本地数据库
  • 完成主任务后记录日志
  • 任务编排,如完成数据导入后再执行分析任务

假设我们有如下几个 Completable:

Completable first = Completable
  .fromSingle(Single.just(1));
Completable second = Completable
  .fromRunnable(() -> {});
Throwable throwable = new RuntimeException();
Completable error = Single.error(throwable)
  .ignoreElement();

使用 andThen() 方法可以将多个 Completable 串行连接起来:

first
  .andThen(second)
  .test()
  .assertComplete();

⚠️ 如果其中任何一个 Completable 出错,整个链路都会终止并触发 onError。

first
  .andThen(second)
  .andThen(error)
  .test()
  .assertError(throwable);

⚠️ 如果某个 Completable 永远不结束(比如 never),那么整个链也不会完成。

...
  .andThen(Completable.never())
  .test()
  .assertNotComplete();

5. 并行组合 Completable

使用 merge() 合并多个 Completable

当我们需要等待多个 Completable 全部完成时,可以使用 merge() 操作符。

只有当所有 Completable 都完成时,合并后的 Completable 才会完成;只要有一个出错,就会触发 onError。

Completable.mergeArray(first, second)
  .test()
  .assertComplete();

Completable.mergeArray(first, second, error)
  .test()
  .assertError(throwable);

使用 flatMapCompletable() 处理流中的每个元素

当我们有一个数据流(如 Flowable),并且希望对每个元素执行一个 Completable 操作时,可以用 flatMapCompletable()

Completable allElementsCompletable = Flowable
  .just("request received", "user logged in")
  .flatMapCompletable(message -> Completable
      .fromRunnable(() -> System.out.println(message))
  );
allElementsCompletable
  .test()
  .assertComplete();

这个方法在 Observable、Maybe、Single 等类型上也有提供。

📌 实际应用场景包括:

  • 对每个事件进行日志记录
  • 每次操作成功后做快照备份

使用 amb() 获取最先完成的结果

有时我们希望同时启动多个 Completable,但只关心哪一个先完成。

amb() 操作符会选择第一个完成或出错的 Completable,并取消其余操作。

Completable.ambArray(first, Completable.never(), second)
  .test()
  .assertComplete();

如果第一个 Completable 出错,则整个结果也会出错:

Completable.ambArray(error, first, second)
  .test()
  .assertError(throwable);

💡 应用场景举例

  • 将同一个备份文件上传到多个服务器,第一个上传成功就完成任务
  • 多路重试机制中,最先成功的那个作为最终结果

6. 总结

本文介绍了 RxJava 中的 Completable 类型及其常见组合方式:

操作符 功能
andThen() 串行执行多个 Completable
merge() 并行等待多个 Completable 完成
flatMapCompletable() 对数据流中每个元素执行 Completable
amb() 只取最先完成的那个 Completable

这些操作符在实际开发中可以帮助我们灵活处理无值响应的异步流程,尤其适用于流程编排、资源清理、任务组合等场景。

完整示例代码可在 GitHub 查看:https://github.com/eugenp/tutorials/tree/master/rxjava-modules/rxjava-core


原始标题:Combining RxJava Completables