1. 概述

在 Android 或后端 Kotlin 开发中,我们可能正在使用 RxJavaSingle,也可能逐渐转向更现代的 Kotlin Coroutines。两者都用于处理异步操作,避免阻塞主线程。

但项目迁移不可能一蹴而就。一个稳妥的做法是:**逐步将 RxJava 的 Single 转为协程中的 Deferred**,这样既能保留现有逻辑,又能享受协程的简洁语法。

本文将探讨 **如何安全、高效地将 RxJava 的 Single<T> 转换为 Kotlin 的 Deferred<T>**,并分析每种方案的适用场景和潜在“坑点”。


2. Single 与 Deferred 对比

特性 Single (RxJava) Deferred (Kotlin Coroutines)
用途 表示一个只会发射一个结果或错误的 Observable 表示一个将来可用的结果(类似 Future)
典型场景 API 请求、数据库查询等单次异步操作 协程中启动异步任务,后续通过 await() 获取结果
结果获取方式 subscribe()blockingGet() await()
异常处理 通过 onError() 回调 使用 try/catch 包裹 await()
是否可取消 支持通过 Disposable 取消 支持通过 Job.cancel() 取消

✅ 总结:两者语义高度相似——都是为了表达“一个异步操作最终会返回一个结果或失败”。因此转换是合理且常见的需求。


3. 将 Single 转换为 Deferred 的方法

为了演示,我们先定义一个商品类和测试数据:

data class Product(val id: Int, val name: String, val price: Double)

private val allProducts = listOf(
    Product(1, "Samsung", 1200.0),
    Product(2, "Oppo", 800.0),
    Product(3, "Nokia", 450.0),
    Product(4, "Lenovo", 550.0),
    Product(5, "ASUS", 400.0)
)

接下来创建一个返回 Single<List<Product>> 的函数,筛选价格大于 500 并按价格排序:

private fun getFilteredProducts(): Single<List<Product>> {
    return Single.just(allProducts)
        .map { products ->
            products.sortedBy { it.price }.filter { it.price > 500 }
        }
        .subscribeOn(Schedulers.io())
}

再准备一个扩展函数用于验证结果是否正确:

private suspend fun Deferred<*>.assertOver500AndSorted() {
    assertThat(this.await() as List<*>).containsExactly(
        Product(4, "Lenovo", 550.0),
        Product(2, "Oppo", 800.0),
        Product(1, "Samsung", 1200.0)
    )
}

使用方式如下:

deferred.assertOver500AndSorted()

下面我们尝试四种不同的转换方式。

3.1 使用 async + blockingGet

最直接的方式是利用 async 启动协程,并在其中调用 blockingGet() 阻塞等待结果:

val deferred = async { getFilteredProducts().blockingGet() }

⚠️ 踩坑提醒

  • blockingGet() 会阻塞当前协程所在的线程,虽然不会影响主线程(因为我们用了 Schedulers.io()),但在高并发场景下仍可能浪费线程资源。
  • 如果原始 Single 出错,异常会直接抛出,需外部用 try/catch 捕获。

优点:代码最简单,适合快速原型或低频调用场景。

缺点:本质上仍是阻塞式调用,违背了协程非阻塞的设计哲学。


3.2 使用 CompletableDeferred

CompletableDeferredDeferred 的可完成版本,允许我们手动设置结果或异常。

我们可以将其作为 Single.subscribe() 的回调目标:

val deferred = CompletableDeferred<List<Product>>()
getFilteredProducts().subscribe(
    deferred::complete,
    deferred::completeExceptionally
)
  • complete(value):当 Single 成功发射数据时调用。
  • completeExceptionally(exception):发生错误时调用,自动转为协程中的异常。

优点

  • 完全非阻塞,响应式风格。
  • 异常处理清晰,天然对接协程的异常机制。

⚠️ 注意:务必确保只调用一次 completecompleteExceptionally,否则会抛出 IllegalStateException


3.3 使用 suspendCoroutine 或 suspendCancellableCoroutine

这是更底层但更灵活的方式。suspendCoroutine 允许我们在 suspend 函数中挂起协程,等待回调唤醒。

val deferred = async {
    suspendCoroutine { continuation ->
        getFilteredProducts().subscribe(
            continuation::resume,
            continuation::resumeWithException
        )
    }
}
  • continuation.resume(value) 触发协程恢复。
  • continuation.resumeWithException(e) 以异常结束协程。

如果需要支持取消,应使用 suspendCancellableCoroutine

val deferred = async {
    suspendCancellableCoroutine { continuation ->
        val disposable = getFilteredProducts().subscribe(
            continuation::resume,
            continuation::resumeWithException
        )
        // 当协程被取消时,自动 dispose RxJava 流
        continuation.invokeOnCancellation { disposable.dispose() }
    }
}

优点

  • 真正挂起而非阻塞。
  • 支持取消传播,资源管理更安全。

缺点:代码稍复杂,适合封装成通用工具函数。


3.4 使用 Kotlinx Coroutines Rx3 扩展库

官方提供了 kotlinx-coroutines-rx3 库,专门用于桥接 RxJava 3 与 Kotlin Coroutines。

首先添加依赖:

<dependency>
    <groupId>org.jetbrains.kotlinx</groupId>
    <artifactId>kotlinx-coroutines-rx3</artifactId>
    <version>1.8.0</version>
</dependency>

该库为 Single 提供了 await() 扩展函数:

suspend fun <T> SingleSource<T>.await(): T

使用方式极其简洁:

val deferred = async { getFilteredProducts().await() }
  • 成功时返回值。
  • 失败时自动抛出异常,可被 try/catch 捕获。

优点

  • 语法最简洁,语义清晰。
  • 内部实现基于 suspendCancellableCoroutine,支持取消。
  • 官方维护,稳定性高。

缺点:引入额外依赖,如果你项目已经重度使用 RxJava,值得引入;否则可能略显臃肿。


4. 总结与建议

方法 是否推荐 适用场景
async { blockingGet() } ⚠️ 仅临时 快速迁移、低频调用,不追求性能
CompletableDeferred ✅ 推荐 需要精细控制回调逻辑
suspendCoroutine ✅ 可选 封装通用转换工具
suspendCancellableCoroutine ✅ 强烈推荐 需要支持取消的生产环境
kotlinx-coroutines-rx3.await() 首选 已使用 RxJava 且希望无缝集成协程的项目

📌 最终建议

  • 如果你已经在使用 RxJava 且短期内无法完全迁移到 Flow,**强烈推荐引入 kotlinx-coroutines-rx3**,它提供了最优雅、最安全的互操作方式。
  • 若不想引入新依赖,优先选择 suspendCancellableCoroutine 实现,兼顾性能与可取消性。

所有示例代码均可在 GitHub 获取:https://github.com/baeldung/kotlin-tutorials/tree/master/core-kotlin-modules/core-kotlin-concurrency-3


原始标题:Convert RxJava Single to Kotlin Coroutine Deferred