1. 概述
本文将深入探讨在 Project Reactor 中组合 Publisher 的多种实用方法。作为响应式编程的核心操作,掌握这些技巧能帮你更优雅地处理数据流。
2. Maven 依赖
首先配置基础依赖环境(版本 3.6.0):
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.6.0</version>
<scope>test</scope>
</dependency>
3. 组合 Publisher 的方法
当处理 Flux
Flux<Integer> evenNumbers = Flux
.range(min, max)
.filter(x -> x % 2 == 0); // 输出: 2, 4
Flux<Integer> oddNumbers = Flux
.range(min, max)
.filter(x -> x % 2 > 0); // 输出: 1, 3, 5
3.1. concat() 方法
concat 按顺序连接多个数据源,必须等前一个流完成才订阅下一个流。任何错误都会立即中断整个流程。
@Test
public void givenFluxes_whenConcatIsInvoked_thenConcat() {
Flux<Integer> fluxOfIntegers = Flux.concat(
evenNumbers,
oddNumbers);
StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}
3.2. concatWith() 方法
concatWith 是 concat 的实例方法版本,简单粗暴地连接两个流:
@Test
public void givenFluxes_whenConcatWithIsInvoked_thenConcatWith() {
Flux<Integer> fluxOfIntegers = evenNumbers.concatWith(oddNumbers);
// 验证逻辑同上节 concat 示例
}
3.3. combineLatest() 方法
combineLatest 实时组合各流最新发出的元素,特别适合处理动态数据关联场景:
@Test
public void givenFluxes_whenCombineLatestIsInvoked_thenCombineLatest() {
Flux<Integer> fluxOfIntegers = Flux.combineLatest(
evenNumbers,
oddNumbers,
(a, b) -> a + b);
StepVerifier.create(fluxOfIntegers)
.expectNext(5) // 4 + 1
.expectNext(7) // 4 + 3
.expectNext(9) // 4 + 5
.expectComplete()
.verify();
}
⚠️ 关键点:使用 evenNumbers 的最新值(4)与 oddNumbers 的每个元素组合,生成序列 5,7,9。
3.4. merge() 方法
merge 立即订阅所有流,按元素实际到达顺序合并输出(非订阅顺序):
@Test
public void givenFluxes_whenMergeIsInvoked_thenMerge() {
Flux<Integer> fluxOfIntegers = Flux.merge(
evenNumbers,
oddNumbers);
StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}
当流元素存在延迟时,结果会完全不同:
@Test
public void givenFluxes_whenMergeWithDelayedElementsIsInvoked_thenMergeWithDelayedElements() {
Flux<Integer> fluxOfIntegers = Flux.merge(
evenNumbers.delayElements(Duration.ofMillis(500L)),
oddNumbers.delayElements(Duration.ofMillis(300L)));
StepVerifier.create(fluxOfIntegers)
.expectNext(1) // oddNumbers 先到达
.expectNext(2) // evenNumbers 延迟到达
.expectNext(3)
.expectNext(5)
.expectNext(4) // evenNumbers 最后元素
.expectComplete()
.verify();
}
3.5. mergeSequential() 方法
mergeSequential 结合了 merge 的即时订阅和 concat 的顺序保证:
@Test
public void testMergeSequential() {
Flux<Integer> fluxOfIntegers = Flux.mergeSequential(
evenNumbers,
oddNumbers);
StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}
✅ 特点:
- 立即订阅所有流(非懒加载)
- 按订阅顺序输出元素
3.6. mergeDelayError() 方法
mergeDelayError 是 merge 的容错版本,延迟错误处理直到所有元素处理完毕:
@Test
public void givenFluxes_whenMergeWithDelayedElementsIsInvoked_thenMergeWithDelayedElements() {
Flux<Integer> fluxOfIntegers = Flux.mergeDelayError(1,
evenNumbers.delayElements(Duration.ofMillis(500L)),
oddNumbers.delayElements(Duration.ofMillis(300L)));
StepVerifier.create(fluxOfIntegers)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(5)
.expectNext(4)
.expectComplete()
.verify();
}
3.7. mergeWith() 方法
mergeWith 是 merge 的实例方法版本,直接合并当前流与目标流:
@Test
public void givenFluxes_whenMergeWithIsInvoked_thenMergeWith() {
Flux<Integer> fluxOfIntegers = evenNumbers.mergeWith(oddNumbers);
// 验证逻辑同 3.4 节 merge 示例
}
3.8. zip() 方法
zip 严格按位置配对元素,任一流完成即终止组合:
@Test
public void givenFluxes_whenZipIsInvoked_thenZip() {
Flux<Integer> fluxOfIntegers = Flux.zip(
evenNumbers,
oddNumbers,
(a, b) -> a + b);
StepVerifier.create(fluxOfIntegers)
.expectNext(3) // 2 + 1
.expectNext(7) // 4 + 3
.expectComplete()
.verify();
}
❌ 注意:oddNumbers 的元素 5 因无配对被丢弃。
3.9. zipWith() 方法
zipWith 是 zip 的双流专用版本:
@Test
public void givenFluxes_whenZipWithIsInvoked_thenZipWith() {
Flux<Integer> fluxOfIntegers = evenNumbers
.zipWith(oddNumbers, (a, b) -> a * b);
StepVerifier.create(fluxOfIntegers)
.expectNext(2) // 2 * 1
.expectNext(12) // 4 * 3
.expectComplete()
.verify();
}
4. 总结
本文系统梳理了 Project Reactor 中组合 Publisher 的核心方法,关键区别总结如下:
方法 | 订阅方式 | 元素顺序 | 错误处理 | 适用场景 |
---|---|---|---|---|
concat | 顺序订阅 | 严格按流顺序 | 立即中断 | 有序流处理 |
merge | 立即订阅 | 按到达顺序 | 立即中断 | 无序流合并 |
mergeSequential | 立即订阅 | 按订阅顺序 | 立即中断 | 需要顺序保证的合并 |
combineLatest | 立即订阅 | 按最新值组合 | 立即中断 | 动态数据关联 |
zip | 立即订阅 | 按位置配对 | 立即中断 | 严格对齐的数据处理 |
完整示例代码见 GitHub 仓库。