1. 简介

Kotlin Flow 提供了一套简洁高效的 API,用于处理异步数据流。在实际开发中,我们经常会遇到这样的需求:将两个 Flow 按顺序拼接,确保第一个 Flow 完全发射完毕后,再开始第二个 Flow 的发射

本文将系统性地介绍多种实现 Flow 顺序拼接的方法,并分析其适用场景,帮助你在不同业务背景下做出合理选择,避免踩坑 ❌。

2. 使用自定义 Flow 构建器

最直观的方式是通过 flow { } 构建器手动控制收集顺序:

fun concatenateFlowsUsingCustomBuilder(flow1: Flow<Int>, flow2: Flow<Int>): Flow<Int> = flow {
    flow1.collect { emit(it) }
    flow2.collect { emit(it) }
}

原理说明

  • 先对 flow1 调用 collect,逐个 emit 其元素;
  • flow1 完成后,再处理 flow2
  • 天然保证了顺序性,且无额外依赖。

📌 测试验证:

@Test
fun `concatenate two flows using custom flow builder`() = runBlocking {
    val flow1 = flowOf(1, 2, 3)
    val flow2 = flowOf(4, 5, 6)
    val result = concatenateFlowsUsingCustomBuilder(flow1, flow2).toList()

    assertEquals(listOf(1, 2, 3, 4, 5, 6), result)
}

⚠️ 注意:这种方式虽然简单,但如果 Flow 数据量大或存在耗时操作,需注意协程调度与背压问题。

3. 使用 flattenConcat() 方法

如果你有一个 Flow<Flow<T>> 类型的数据源(即“流的流”),可以使用 flattenConcat() 实现顺序扁平化:

@Test
fun `concatenate two flows using flattenConcat method`() = runBlocking {
    val flow1 = flowOf(1, 2, 3)
    val flow2 = flowOf(4, 5, 6)
    val result = flowOf(flow1, flow2).flattenConcat().toList()
        
    assertEquals(listOf(1, 2, 3, 4, 5, 6), result)
}

关键点

  • flowOf(flow1, flow2) 创建了一个包含两个子流的外层 Flow;
  • flattenConcat() 会依次订阅每个子流,前一个结束才启动下一个;
  • 内部已做优化,适合处理多个连续 Flow 的场景。

📌 适用于需要动态构建 Flow 列表并顺序执行的场景,比如批量任务串行化。

4. 使用 onCompletion() 方法

利用 Flow 的生命周期钩子 onCompletion,可以在流完成时触发后续逻辑:

@Test
fun `concatenate two flows using onCompletion method`() = runBlocking {
    val flow1 = flowOf(1, 2, 3)
    val flow2 = flowOf(4, 5, 6)
    val result = flow1.onCompletion { emitAll(flow2) }.toList()

    assertEquals(listOf(1, 2, 3, 4, 5, 6), result)
}

优势

  • 声明式写法,代码简洁;
  • emitAll(flow2) 自动处理 flow2 的所有发射项;

⚠️ 注意事项:

  • onCompletion 在成功完成或异常终止时都会触发,若需精确控制,应结合 catch 使用;
  • 不推荐在 onCompletion 中做复杂逻辑,保持轻量。

5. 使用 collect()emitAll() 组合

这是第 2 节的增强版写法,利用 emitAll 简化第二段 Flow 的发射:

fun concatenateFlowsUsingEmitAll(flow1: Flow<Int>, flow2: Flow<Int>): Flow<Int> = flow {
    flow1.collect { emit(it) }
    emitAll(flow2)
}

✅ 对比优势:

  • 相比手动 collect + emitemitAll 更高效且语义清晰;
  • 避免了嵌套 lambda,可读性更好。

测试代码:

@Test
fun `concatenate two flows using collect and emitAll method`() = runBlocking {
    val flow1 = flowOf(1, 2, 3)
    val flow2 = flowOf(4, 5, 6)
    val result = concatenateFlowsUsingEmitAll(flow1, flow2).toList()

    assertEquals(listOf(1, 2, 3, 4, 5, 6), result)
}

📌 推荐作为默认实现方式之一,尤其适合封装成工具函数。

6. 使用 Reactor 的 Flux.concat()

对于已经引入 Project Reactor 技术栈的项目(如 Spring WebFlux),可以借助 Flux.concat() 实现高性能拼接。

6.1 添加依赖

Maven 配置:

<dependency>
    <groupId>org.jetbrains.kotlinx</groupId>
    <artifactId>kotlinx-coroutines-reactor</artifactId>
    <version>1.6.0</version>
</dependency>

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.4.11</version>
</dependency>

Gradle 配置:

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.6.0"
implementation "io.projectreactor:reactor-core:3.4.11"

6.2 转换与拼接

fun concatenateFlowsUsingReactive(flow1: Flow<Int>, flow2: Flow<Int>): Flow<Int> {
    val flux1 = flow1.asFlux()
    val flux2 = flow2.asFlux()
    return Flux.concat(flux1, flux2).asFlow()
}

✅ 核心流程:

  1. asFlux() 将 Kotlin Flow 转为 Reactor 的 Flux
  2. 使用 Flux.concat(flux1, flux2) 实现高效串行合并;
  3. asFlow() 将结果转回 Kotlin Flow。

测试示例:

@Test
fun `concatenate two flows using reactive`() = runBlocking {
    val flow1 = flowOf(1, 2, 3)
    val flow2 = flowOf(4, 5, 6)
    val result = concatenateFlowsUsingReactive(flow1, flow2).toList()

    assertEquals(listOf(1, 2, 3, 4, 5, 6), result)
}

⚠️ 适用建议:

  • ✅ 已使用 Reactor 的项目可无缝集成;
  • ❌ 单纯为了拼接 Flow 而引入 Reactor 属于过度设计,增加复杂度。

7. 总结

方法 优点 缺点 推荐场景
自定义 Flow 构建器 简单直接,无依赖 手动控制略啰嗦 小型项目、学习理解原理
flattenConcat() 支持多流动态拼接 需构造 Flow<Flow<T>> 批量任务串行执行
onCompletion { emitAll() } 声明式语法,简洁 触发时机需注意异常情况 快速串联两个固定 Flow
collect + emitAll 清晰高效,易封装 稍显底层 推荐作为通用方案
Flux.concat() 性能优,背压处理好 引入额外依赖 已接入 Reactor 生态

📌 选型建议

  • 日常开发优先使用 collect + emitAllflattenConcat()
  • 若追求极致性能且技术栈兼容,可考虑 Reactor 方案;
  • 避免为了炫技而滥用复杂方案,简单可控才是生产环境的王道 ✅。

原始标题:Sequentially Concatenate 2 Kotlin Flows