1. 简介
Kotlin Flow 提供了一套简洁高效的 API,用于处理异步数据流。在实际开发中,我们经常会遇到这样的需求:将两个 Flow 按顺序拼接,确保第一个 Flow 完全发射完毕后,再开始第二个 Flow 的发射。
本文将系统性地介绍多种实现 Flow 顺序拼接的方法,并分析其适用场景,帮助你在不同业务背景下做出合理选择,避免踩坑 ❌。
2. 使用自定义 Flow 构建器
最直观的方式是通过 flow { }
构建器手动控制收集顺序:
fun concatenateFlowsUsingCustomBuilder(flow1: Flow<Int>, flow2: Flow<Int>): Flow<Int> = flow {
flow1.collect { emit(it) }
flow2.collect { emit(it) }
}
✅ 原理说明:
- 先对
flow1
调用collect
,逐个emit
其元素; flow1
完成后,再处理flow2
;- 天然保证了顺序性,且无额外依赖。
📌 测试验证:
@Test
fun `concatenate two flows using custom flow builder`() = runBlocking {
val flow1 = flowOf(1, 2, 3)
val flow2 = flowOf(4, 5, 6)
val result = concatenateFlowsUsingCustomBuilder(flow1, flow2).toList()
assertEquals(listOf(1, 2, 3, 4, 5, 6), result)
}
⚠️ 注意:这种方式虽然简单,但如果 Flow 数据量大或存在耗时操作,需注意协程调度与背压问题。
3. 使用 flattenConcat()
方法
如果你有一个 Flow<Flow<T>>
类型的数据源(即“流的流”),可以使用 flattenConcat()
实现顺序扁平化:
@Test
fun `concatenate two flows using flattenConcat method`() = runBlocking {
val flow1 = flowOf(1, 2, 3)
val flow2 = flowOf(4, 5, 6)
val result = flowOf(flow1, flow2).flattenConcat().toList()
assertEquals(listOf(1, 2, 3, 4, 5, 6), result)
}
✅ 关键点:
flowOf(flow1, flow2)
创建了一个包含两个子流的外层 Flow;flattenConcat()
会依次订阅每个子流,前一个结束才启动下一个;- 内部已做优化,适合处理多个连续 Flow 的场景。
📌 适用于需要动态构建 Flow 列表并顺序执行的场景,比如批量任务串行化。
4. 使用 onCompletion()
方法
利用 Flow 的生命周期钩子 onCompletion
,可以在流完成时触发后续逻辑:
@Test
fun `concatenate two flows using onCompletion method`() = runBlocking {
val flow1 = flowOf(1, 2, 3)
val flow2 = flowOf(4, 5, 6)
val result = flow1.onCompletion { emitAll(flow2) }.toList()
assertEquals(listOf(1, 2, 3, 4, 5, 6), result)
}
✅ 优势:
- 声明式写法,代码简洁;
emitAll(flow2)
自动处理flow2
的所有发射项;
⚠️ 注意事项:
onCompletion
在成功完成或异常终止时都会触发,若需精确控制,应结合catch
使用;- 不推荐在
onCompletion
中做复杂逻辑,保持轻量。
5. 使用 collect()
和 emitAll()
组合
这是第 2 节的增强版写法,利用 emitAll
简化第二段 Flow 的发射:
fun concatenateFlowsUsingEmitAll(flow1: Flow<Int>, flow2: Flow<Int>): Flow<Int> = flow {
flow1.collect { emit(it) }
emitAll(flow2)
}
✅ 对比优势:
- 相比手动
collect + emit
,emitAll
更高效且语义清晰; - 避免了嵌套 lambda,可读性更好。
测试代码:
@Test
fun `concatenate two flows using collect and emitAll method`() = runBlocking {
val flow1 = flowOf(1, 2, 3)
val flow2 = flowOf(4, 5, 6)
val result = concatenateFlowsUsingEmitAll(flow1, flow2).toList()
assertEquals(listOf(1, 2, 3, 4, 5, 6), result)
}
📌 推荐作为默认实现方式之一,尤其适合封装成工具函数。
6. 使用 Reactor 的 Flux.concat()
对于已经引入 Project Reactor 技术栈的项目(如 Spring WebFlux),可以借助 Flux.concat()
实现高性能拼接。
6.1 添加依赖
Maven 配置:
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-reactor</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.11</version>
</dependency>
Gradle 配置:
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.6.0"
implementation "io.projectreactor:reactor-core:3.4.11"
6.2 转换与拼接
fun concatenateFlowsUsingReactive(flow1: Flow<Int>, flow2: Flow<Int>): Flow<Int> {
val flux1 = flow1.asFlux()
val flux2 = flow2.asFlux()
return Flux.concat(flux1, flux2).asFlow()
}
✅ 核心流程:
asFlux()
将 Kotlin Flow 转为 Reactor 的Flux
;- 使用
Flux.concat(flux1, flux2)
实现高效串行合并; asFlow()
将结果转回 Kotlin Flow。
测试示例:
@Test
fun `concatenate two flows using reactive`() = runBlocking {
val flow1 = flowOf(1, 2, 3)
val flow2 = flowOf(4, 5, 6)
val result = concatenateFlowsUsingReactive(flow1, flow2).toList()
assertEquals(listOf(1, 2, 3, 4, 5, 6), result)
}
⚠️ 适用建议:
- ✅ 已使用 Reactor 的项目可无缝集成;
- ❌ 单纯为了拼接 Flow 而引入 Reactor 属于过度设计,增加复杂度。
7. 总结
方法 | 优点 | 缺点 | 推荐场景 |
---|---|---|---|
自定义 Flow 构建器 | 简单直接,无依赖 | 手动控制略啰嗦 | 小型项目、学习理解原理 |
flattenConcat() |
支持多流动态拼接 | 需构造 Flow<Flow<T>> |
批量任务串行执行 |
onCompletion { emitAll() } |
声明式语法,简洁 | 触发时机需注意异常情况 | 快速串联两个固定 Flow |
collect + emitAll |
清晰高效,易封装 | 稍显底层 | 推荐作为通用方案 |
Flux.concat() |
性能优,背压处理好 | 引入额外依赖 | 已接入 Reactor 生态 |
📌 选型建议:
- 日常开发优先使用
collect + emitAll
或flattenConcat()
; - 若追求极致性能且技术栈兼容,可考虑 Reactor 方案;
- 避免为了炫技而滥用复杂方案,简单可控才是生产环境的王道 ✅。