1. 概述

本文将深入探讨在 Project Reactor 中组合 Publisher 的多种实用方法。作为响应式编程的核心操作,掌握这些技巧能帮你更优雅地处理数据流。

2. Maven 依赖

首先配置基础依赖环境(版本 3.6.0):

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.6.0</version>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <version>3.6.0</version>
    <scope>test</scope>
</dependency>

3. 组合 Publisher 的方法

当处理 FluxMono 时,有多种方式组合数据流。下面通过两个示例 Flux 流演示核心方法:

Flux<Integer> evenNumbers = Flux
  .range(min, max)
  .filter(x -> x % 2 == 0); // 输出: 2, 4

Flux<Integer> oddNumbers = Flux
  .range(min, max)
  .filter(x -> x % 2 > 0);  // 输出: 1, 3, 5

3.1. concat() 方法

concat 按顺序连接多个数据源,必须等前一个流完成才订阅下一个流。任何错误都会立即中断整个流程。

@Test
public void givenFluxes_whenConcatIsInvoked_thenConcat() {
    Flux<Integer> fluxOfIntegers = Flux.concat(
      evenNumbers, 
      oddNumbers);
        
    StepVerifier.create(fluxOfIntegers)
      .expectNext(2)
      .expectNext(4)
      .expectNext(1)
      .expectNext(3)
      .expectNext(5)
      .expectComplete()
      .verify();
}

3.2. concatWith() 方法

concatWithconcat 的实例方法版本,简单粗暴地连接两个流:

@Test
public void givenFluxes_whenConcatWithIsInvoked_thenConcatWith() {
    Flux<Integer> fluxOfIntegers = evenNumbers.concatWith(oddNumbers);
        
    // 验证逻辑同上节 concat 示例
}

3.3. combineLatest() 方法

combineLatest 实时组合各流最新发出的元素,特别适合处理动态数据关联场景:

@Test
public void givenFluxes_whenCombineLatestIsInvoked_thenCombineLatest() {
    Flux<Integer> fluxOfIntegers = Flux.combineLatest(
      evenNumbers, 
      oddNumbers, 
      (a, b) -> a + b);

    StepVerifier.create(fluxOfIntegers)
      .expectNext(5) // 4 + 1
      .expectNext(7) // 4 + 3
      .expectNext(9) // 4 + 5
      .expectComplete()
      .verify();
}

⚠️ 关键点:使用 evenNumbers 的最新值(4)与 oddNumbers 的每个元素组合,生成序列 5,7,9。

3.4. merge() 方法

merge 立即订阅所有流,按元素实际到达顺序合并输出(非订阅顺序):

@Test
public void givenFluxes_whenMergeIsInvoked_thenMerge() {
    Flux<Integer> fluxOfIntegers = Flux.merge(
      evenNumbers, 
      oddNumbers);
        
    StepVerifier.create(fluxOfIntegers)
      .expectNext(2)
      .expectNext(4)
      .expectNext(1)
      .expectNext(3)
      .expectNext(5)
      .expectComplete()
      .verify();
}

当流元素存在延迟时,结果会完全不同:

@Test
public void givenFluxes_whenMergeWithDelayedElementsIsInvoked_thenMergeWithDelayedElements() {
    Flux<Integer> fluxOfIntegers = Flux.merge(
      evenNumbers.delayElements(Duration.ofMillis(500L)), 
      oddNumbers.delayElements(Duration.ofMillis(300L)));
        
    StepVerifier.create(fluxOfIntegers)
      .expectNext(1)  // oddNumbers 先到达
      .expectNext(2)  // evenNumbers 延迟到达
      .expectNext(3)
      .expectNext(5)
      .expectNext(4)  // evenNumbers 最后元素
      .expectComplete()
      .verify();
}

3.5. mergeSequential() 方法

mergeSequential 结合了 merge 的即时订阅和 concat 的顺序保证:

@Test
public void testMergeSequential() {
    Flux<Integer> fluxOfIntegers = Flux.mergeSequential(
      evenNumbers, 
      oddNumbers);
        
    StepVerifier.create(fluxOfIntegers)
      .expectNext(2)
      .expectNext(4)
      .expectNext(1)
      .expectNext(3)
      .expectNext(5)
      .expectComplete()
      .verify();
}

✅ 特点:

  • 立即订阅所有流(非懒加载)
  • 按订阅顺序输出元素

3.6. mergeDelayError() 方法

mergeDelayErrormerge 的容错版本,延迟错误处理直到所有元素处理完毕

@Test
public void givenFluxes_whenMergeWithDelayedElementsIsInvoked_thenMergeWithDelayedElements() {
    Flux<Integer> fluxOfIntegers = Flux.mergeDelayError(1, 
      evenNumbers.delayElements(Duration.ofMillis(500L)), 
      oddNumbers.delayElements(Duration.ofMillis(300L)));
        
    StepVerifier.create(fluxOfIntegers)
      .expectNext(1)
      .expectNext(2)
      .expectNext(3)
      .expectNext(5)
      .expectNext(4)
      .expectComplete()
      .verify();
}

3.7. mergeWith() 方法

mergeWithmerge 的实例方法版本,直接合并当前流与目标流:

@Test
public void givenFluxes_whenMergeWithIsInvoked_thenMergeWith() {
    Flux<Integer> fluxOfIntegers = evenNumbers.mergeWith(oddNumbers);
        
    // 验证逻辑同 3.4 节 merge 示例
}

3.8. zip() 方法

zip 严格按位置配对元素,任一流完成即终止组合:

@Test
public void givenFluxes_whenZipIsInvoked_thenZip() {
    Flux<Integer> fluxOfIntegers = Flux.zip(
      evenNumbers, 
      oddNumbers, 
      (a, b) -> a + b);
        
    StepVerifier.create(fluxOfIntegers)
      .expectNext(3) // 2 + 1
      .expectNext(7) // 4 + 3
      .expectComplete()
      .verify();
}

❌ 注意:oddNumbers 的元素 5 因无配对被丢弃。

3.9. zipWith() 方法

zipWithzip 的双流专用版本:

@Test
public void givenFluxes_whenZipWithIsInvoked_thenZipWith() {
    Flux<Integer> fluxOfIntegers = evenNumbers
     .zipWith(oddNumbers, (a, b) -> a * b);
        
    StepVerifier.create(fluxOfIntegers)
      .expectNext(2)  // 2 * 1
      .expectNext(12) // 4 * 3
      .expectComplete()
      .verify();
}

4. 总结

本文系统梳理了 Project Reactor 中组合 Publisher 的核心方法,关键区别总结如下:

方法 订阅方式 元素顺序 错误处理 适用场景
concat 顺序订阅 严格按流顺序 立即中断 有序流处理
merge 立即订阅 按到达顺序 立即中断 无序流合并
mergeSequential 立即订阅 按订阅顺序 立即中断 需要顺序保证的合并
combineLatest 立即订阅 按最新值组合 立即中断 动态数据关联
zip 立即订阅 按位置配对 立即中断 严格对齐的数据处理

完整示例代码见 GitHub 仓库


原始标题:Combining Publishers in Project Reactor

» 下一篇: Java Weekly, 第218期