1. 引言

在选择任何技术方案时,首要问题是:我们要解决什么问题?

RxJava 及其扩展 RxKotlin 是响应式宣言(Reactive Manifesto)的实现,要求应用具备响应性(Responsive)、弹性(Resilient)、伸缩性(Elastic)和消息驱动(Message-Driven)。这些系统通常采用异步通信、对数据源施加背压(back-pressure),并在系统故障时优雅降级。

这是我们比较协程(Coroutines)与响应式流(Reactive Streams)库的基础。如果两者都适用,我们需要比较哪一种更具可读性和更适合生产环境运行。

然而,Kotlin 协程库并不是 RxKotlin 的直接竞争者。它的适用范围更广。Kotlin 协程主要由两个概念构成:

  • suspend 关键字,这是 Kotlin 语言本身的一部分;
  • kotlinx.coroutines 库,它提供了 suspend 的默认实现。

suspend 关键字保证了协程之间的协作式并发:在调用 suspend 函数时,控制权可以交给其他协程继续执行。

因此,协程的核心理念并不涉及消息传递、背压或异步,它主要是为了在 CPU 密集型任务之间实现非阻塞等待,从而让其他协程更高效地利用 CPU。当然,通过提供轻量级线程模型(协程),这个库也提供了一种新颖的异步编程方式。


2. 响应式系统应具备响应性

我们先看看响应式宣言中提到的“响应性”:系统响应应快速、一致,问题应能快速被发现并处理,即“快速失败(fail fast)”。

协程支持快速失败机制,例如:

withTimeout(100.milliseconds) {
    delay(3.seconds)
    // 会抛出 TimeoutCancellationException
}

响应性还意味着要合理管理资源,让可处理的请求优先执行,同时保留一部分资源用于控制回路。协程在这方面表现良好,因为它们在等待 IO 时不占用线程资源。

但要注意:协程调度器(CoroutineDispatcher)可能因阻塞代码而“饥饿”

repeat(3) {
    starvingContext.launch {
        Thread.sleep(30000) // 阻塞线程
    }
}

runBlocking {
    withTimeout(100.milliseconds) {
        starvingContext.launch { println("A quick task which will never execute") }.join()
    }
}

如果我们使用非阻塞的等待函数 delay(),就能正常输出:

repeat(3) {
    starvingContext.launch {
        delay(30000) // 非阻塞
    }
}
runBlocking {
    withTimeout(100.milliseconds) {
        withContext(workingContext.coroutineContext) {
            println("This messages gets to be printed")
        }
    }
}

再来看 RxJava 的响应性实现:

Observable
    .fromCallable {
        Thread.sleep(30000)
        "result"
    }
    .subscribeOn(worker)
    .timeout(100, TimeUnit.MILLISECONDS)
    .subscribe({ msg -> println(msg) }, { worker.shutdown() })

RxJava 的超时处理是通过操作符链添加的,而不是直接包裹在操作周围。这虽然功能上没问题,但不如协程显式并发模型清晰 —— 比如你无法一眼看出 onNextonError 是在哪个线程执行的。

RxJava 使用 Scheduler 来管理线程模型:

val worker = Schedulers.computation() // 或 .io(), .single()

虽然比协程的 Scope + Context + Dispatcher 更简单,但仍需手动管理线程池大小和调度策略


3. 响应式系统应具备弹性

响应式宣言指出:弹性来源于复制(Replication)、隔离(Isolation)、封装(Containment)和委派(Delegation),即每个组件的恢复应由外部组件处理。

在 Kotlin 协程中,异常就是普通的异常。没有像 ArrowKt 这样的库时,你无法强制在某个层级处理异常。Kotlin 的异常都是未声明的(unchecked)。

RxJava 的异常处理方式略有不同:

Observable.error<CustomException>(CustomException())
    .subscribe({ msg -> println(msg) }, { ex -> failed = ex !is CustomException })
    .dispose()

RxJava 的错误处理不是强制的,默认行为是忽略错误并打印到 System.err。这可能是个缺点,因为协程中未捕获的异常会上抛,直到应用崩溃。

协程通过上下文(Context)进行组件监督,上下文形成层级结构。父协程负责子协程的生命周期和异常处理。默认情况下,一个子协程失败会导致整个协程树终止。

虽然协程在异常处理上不如 RxJava 严谨,但通过引入函数式库(如 ArrowKt)可以弥补。


4. 响应式系统应具备伸缩性

在 Java Fibers(Loom 项目)落地之前,Kotlin 协程在资源成本上更具优势

在伸缩性方面,RxJava 和协程都能使用线程池动态扩展:

Schedulers.computation() // 或 cached 线程池
val dispatcher = Dispatchers.IO.limitedParallelism(4)

但线程是昂贵资源,协程更轻量,可以并发更多任务。

协程优势:更少资源消耗,适合高并发场景。
RxJava 限制:依赖线程池,资源开销大。


5. 响应式系统应为消息驱动

协程是比 Rx 更底层的概念。虽然协程提供了如 FlowChannel 这样的工具用于消息传递:

val pipeline = Channel<String>()
scopeA.launch {
    (1..10).map {
        pipeline.send(it.toString())
    }
    pipeline.close()
}
withContext(scopeB.coroutineContext) {
    pipeline.consumeAsFlow().map {
        println("Received message: $it")
        it
    }.toList()
}

协程并不强制使用消息传递机制,而 Rx 则必须从创建 Observable 开始,本身就基于消息驱动。

响应式宣言还要求消息通信是异步、支持背压、非阻塞 IO。我们来看协程与 Rx 在这些方面的对比。

5.1 异步通信

协程中异步执行任务非常简单,但需要显式声明:

val a = async { requestOverNetwork(0) }
val b = async { requestOverNetwork(1) }
val c = async { requestOverNetwork(2) }
a.await() + b.await() + c.await()

RxJava 实现类似功能:

Observable.merge(
    listOf(
        observeOverNetwork(0),
        observeOverNetwork(1),
        observeOverNetwork(2)
    )
).reduce { t1, t2 -> t1 + t2 }
    .subscribe(testSubscriber)

虽然功能相同,但协程语法更清晰明了。

5.2 背压机制

协程中可以通过 FlowChannel 实现背压:

  • Flow 内置背压机制;
  • Channel 可配置缓冲区,写入时若缓冲区满则挂起。

相比之下,RxJava 的背压机制更成熟,有多种策略支持(如 onBackpressureBufferonBackpressureDrop)。

协程优势:语言级支持,语法简洁。
RxJava 优势:背压机制更成熟、配置更灵活。

5.3 非阻塞 IO

两者都依赖底层 IO 实现是否为非阻塞。协程的优势在于语言级支持:

private suspend fun AsynchronousFileChannel.asyncRead(dst: ByteBuffer, position: Long = 0): Int = suspendCoroutine {
    read(dst, position, it, object : CompletionHandler<Int, Continuation<Int>> {
        override fun completed(result: Int, attachment: Continuation<Int>) = it.resume(result)
        override fun failed(exc: Throwable, attachment: Continuation<Int>) = it.resumeWithException(exc)
    })
}

使用时:

fileChannel.asyncRead(buffer)

RxJava 也支持非阻塞 IO(如 Netty):

val buffer = ByteBuffer.allocate(13)
Observable.create { emitter ->
    fileChannel.read(buffer, 0, emitter, object : CompletionHandler<Int, ObservableEmitter<String>>{
        override fun completed(result: Int, attachment: ObservableEmitter<String>) {
            emitter.onNext(String(buffer.array()))
            emitter.onComplete()
        }
        override fun failed(exc: Throwable, attachment: ObservableEmitter<String>) {
            emitter.tryOnError(exc)
        }
    })
}.subscribe(testSubscriber)

两者都能实现,但协程语法更简洁。


6. 协程与 Rx 结合使用

协程是比响应式库更底层的抽象。我们可以通过它实现类似 Rx 的功能,也可以在不严格遵循响应式宣言的前提下构建更灵活的应用。

Kotlin 提供了与 Rx 互操作的扩展函数,可以在两者之间无缝转换:

val observable = Observable.just("apple")
val result = observable.awaitSingle()

flow {
    (1..5).forEach { emit(it) }
}.asObservable(this.coroutineContext).subscribe(testSubscriber)

优势:结合使用灵活,适合过渡期或混合项目。
⚠️ 注意:Rx 的侵入性较强,一旦使用 Rx 构建,切换回普通代码会比较麻烦。


7. 总结

通过对比,我们可以得出以下结论:

  • ✅ 协程很好地实现了响应式宣言的核心原则,除了错误处理略显薄弱。
  • ✅ 协程更贴近语言,灵活性高,不强制你使用某种风格。
  • ✅ RxJava 更适合已有响应式经验的团队,但侵入性更强。
  • ✅ 协程更适合构建高并发、低资源消耗的系统。
  • ✅ Kotlin 提供了协程与 Rx 的互操作支持,两者可以共存。

所有示例代码可在 GitHub 上找到。


原始标题:Kotlin Coroutines and RxKotlin Comparison