1. 概述

Reactor 库 中,Flux.map()Flux.doOnNext() 操作符在处理流数据元素时扮演不同角色。

  • Flux.map():用于转换 Flux 发射的每个元素
  • Flux.doOnNext():生命周期钩子,可在元素发射时执行副作用操作

本文将深入剖析这两个操作符的内部实现和实际应用场景,并展示如何组合使用它们。

2. Maven 依赖

要使用 Flux 发布器和其他响应式操作符,需在 pom.xml 中添加 reactor-core 依赖:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.6.5</version>
</dependency>

此依赖提供 FluxMono 等核心类。

同时添加 reactor-test 依赖用于单元测试:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <version>3.6.5</version>
    <scope>test</scope>
</dependency>

该依赖提供 StepVerifier 等测试工具类,用于创建测试场景并验证响应式管道行为。

3. 理解 Flux.map() 操作符

Flux.map() 与 Java 内置的 Stream.map() 类似,但作用于响应式流。

3.1. 弹珠图

通过弹珠图理解 Flux.map() 内部机制:

flux map operator marble diagram

图中展示了一个无错误发射数据的 Flux 发布器,以及 map() 操作符对数据的影响:

  • 操作符将圆形数据转换为方形
  • 订阅后发射的是转换后的数据,而非原始数据

3.2. 方法定义

Flux.map() 接受一个 Function 参数,返回包含转换元素的新 Flux

public final <V> Flux<V> map(Function<? super T,? extends V> mapper)

关键特性:

  • 输入:Flux 发布器的数据流
  • mapper 函数同步应用于每个发射元素
  • 输出:基于 mapper 函数转换后的新 Flux

3.3. 示例代码

将数据序列中的每个值乘以 10 进行转换:

Flux<Integer> numbersFlux = Flux.just(50, 51, 52, 53, 54, 55, 56, 57, 58, 59)
  .map(i -> i * 10)
  .onErrorResume(Flux::error);

验证发射的新序列是否符合预期:

StepVerifier.create(numbersFlux)
  .expectNext(500, 510, 520, 530, 540, 550, 560, 570, 580, 590)
  .verifyComplete();

map() 操作符按弹珠图和函数定义处理数据,生成每个值乘以 10 的新输出。

4. 理解 doOnNext() 操作符

Flux.doOnNext() 是生命周期钩子,用于窥探发射的数据流,类似于 Stream.peek()它允许在不修改原始数据流的情况下对每个元素执行副作用操作

4.1. 弹珠图

通过弹珠图理解 Flux.doOnNext() 内部机制:

flux doonnext operator marble diagram

图中展示了 Flux 发射的数据流以及 doOnNext() 操作符对数据的作用。

4.2. 方法定义

查看 doOnNext() 的方法定义:

public final Flux<T> doOnNext(Consumer<? super T> onNext)

关键特性:

  • 接受 Consumer<T> 参数
  • Consumer 是表示副作用操作的函数式接口
  • 消费输入但不产生输出,适合执行副作用操作

4.3. 示例代码

使用 doOnNext() 在订阅时将数据流中的项目记录到控制台:

Flux<Integer> numberFlux = Flux.just(1, 2, 3, 4, 5)
  .doOnNext(number -> {
      LOGGER.info(String.valueOf(number));
  })
  .onErrorResume(Flux::error);

上述代码中,doOnNext() 记录 Flux 发射的每个数字,但不修改实际数值。

5. 组合使用两个操作符

由于 Flux.map()Flux.doOnNext() 功能不同,可在响应式管道中组合使用以实现数据转换和副作用处理

通过记录原始数据和转换后的数据来窥探数据流:

Flux numbersFlux = Flux.just(10, 11, 12, 13, 14)
  .doOnNext(number -> {
      LOGGER.info("Number: " + number);
  })
  .map(i -> i * 5)
  .doOnNext(number -> {
      LOGGER.info("Transformed Number: " + number);
  })
  .onErrorResume(Flux::error);

执行流程:

  1. 使用 doOnNext() 记录 Flux 发射的原始数字
  2. 应用 map() 将每个数字乘以 5 进行转换
  3. 使用另一个 doOnNext() 记录转换后的数字

验证发射数据是否符合预期:

StepVerifier.create(numbersFlux)
  .expectNext(50, 55, 60, 65, 70)
  .verifyComplete();

这种组合方式在转换数据流的同时,通过日志提供对原始和转换元素的可见性。

6. 关键差异

两个操作符都作用于发射数据,但核心区别明显:

特性 Flux.map() Flux.doOnNext()
核心功能 ✅ 转换操作符 ✅ 生命周期钩子
数据修改 ✅ 修改原始数据流 ❌ 不修改数据
返回值 新的转换后 Flux 原始 Flux(带副作用)
典型用例 数据计算、类型转换、业务逻辑处理 日志记录、调试、监控、资源清理

⚠️ 简单粗暴记忆法

  • map() = 数据变形器
  • doOnNext() = 数据观察员

7. 总结

本文深入探讨了 Project Reactor 库中 Flux.map()Flux.doOnNext() 操作符的细节,通过弹珠图、类型定义和实际示例剖析了它们的内部工作机制。

两个操作符服务于不同场景,组合使用可构建强大且健壮的响应式系统。实际开发中踩坑提醒:

  • 避免在 doOnNext() 中执行耗时操作(会阻塞响应式流)
  • map() 中抛出异常会中断流,需配合错误处理操作符

完整示例代码可在 GitHub 获取。


原始标题:Comparison Between Flux.map() and Flux.doOnNext() | Baeldung