1. 概述

在本教程中,我们将深入探讨如何使用 StepVerifierTestPublisher 来测试 响应式流

我们将基于一个包含响应式操作链的 Spring Reactor 应用进行演示。

2. Maven 依赖

Spring Reactor 提供了多个用于测试响应式流的类。

我们可以通过添加 reactor-test 依赖来获取这些测试工具:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
    <version>3.6.0</version>
</dependency>

3. StepVerifier

reactor-test 主要用于两个方面:

✅ 创建基于步骤的测试场景(StepVerifier
✅ 使用 TestPublisher 生成预定义数据,以测试下游操作符的行为

最常见的测试场景是:我们有一个已定义好的 Publisher(FluxMono),想知道当有人订阅它时它的行为是什么样的

通过 StepVerifier API,我们可以按步骤定义对发布元素的期望值,包括:

  • 预期接收到哪些元素
  • 流完成时会发生什么

首先,我们创建一个带有一些操作符的 Publisher:

Flux<String> source = Flux.just("John", "Monica", "Mark", "Cloe", "Frank", "Casper", "Olivia", "Emily", "Cate")
  .filter(name -> name.length() == 4)
  .map(String::toUpperCase);

这个例子中我们过滤出长度为4的名字,并将其转换为大写。

3.1. 逐步验证场景

现在我们使用 StepVerifier 来测试 source 的行为:

StepVerifier
  .create(source)
  .expectNext("JOHN")
  .expectNextMatches(name -> name.startsWith("MA"))
  .expectNext("CLOE", "CATE")
  .expectComplete()
  .verify();

流程如下:

  1. 使用 create() 方法构建 StepVerifier 实例
  2. 传入待测的 Flux
  3. 使用 expectNext() 验证发出的第一个信号,也可以传多个元素进去
  4. 使用 expectNextMatches(Predicate) 自定义匹配逻辑
  5. 最后使用 expectComplete() 表示流应正常结束
  6. 调用 verify() 启动整个测试过程

⚠️ 注意:每个 expectXXX() 方法都代表一个断言步骤,必须严格按照顺序执行。

3.2. 异常处理测试

接下来,我们把 source 和一个会立即抛出异常的 Mono 连接起来:

Flux<String> error = source.concatWith(
  Mono.error(new IllegalArgumentException("Our message"))
);

测试代码如下:

StepVerifier
  .create(error)
  .expectNextCount(4)
  .expectErrorMatches(throwable -> throwable instanceof IllegalArgumentException &&
    throwable.getMessage().equals("Our message")
  ).verify();

这里的关键点是:

✅ 只能有一个异常相关的断言方法(比如 expectErrorMatches
❌ 一旦出现错误信号(OnError),流就关闭了,不能再继续添加其他断言

如果不需要同时检查类型和消息内容,可以使用以下专用方法:

方法 描述
expectError() 任意异常
expectError(Class<? extends Throwable>) 指定类型的异常
expectErrorMessage(String) 指定错误信息
expectErrorMatches(Predicate<Throwable>) 匹配自定义条件
expectErrorSatisfies(Consumer<Throwable>) 自定义断言

3.3. 时间相关 Publisher 的测试

有些 Publisher 是基于时间的。例如,两个事件之间可能间隔一天。

显然我们不能等整整一天才跑完测试。

为此,我们可以使用 StepVerifier.withVirtualTime() 构造器:

StepVerifier
  .withVirtualTime(() -> Flux.interval(Duration.ofSeconds(1)).take(2))
  .expectSubscription()
  .expectNoEvent(Duration.ofSeconds(1))
  .expectNext(0L)
  .thenAwait(Duration.ofSeconds(1))
  .expectNext(1L)
  .verifyComplete();

📌 关键要点:

  • 不要提前实例化 Flux,而是应该在 Supplier 中懒加载创建
  • expectNoEvent(Duration) 必须在 expectSubscription() 之后调用
  • thenAwait(Duration) 是暂停当前验证线程,等待新事件到来

时间相关的主要断言方法:

方法 作用
thenAwait(Duration) 等待指定时间,期间允许事件发生
expectNoEvent(Duration) 在指定时间内不应有任何事件发生

3.4. 测试后置断言

有时我们需要在完整测试流程结束后再做一些额外的状态校验。

比如我们自定义一个 Publisher:

Flux<Integer> source = Flux.<Integer>create(emitter -> {
    emitter.next(1);
    emitter.next(2);
    emitter.next(3);
    emitter.complete();
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    emitter.next(4); // 会被丢弃
}).filter(number -> number % 2 == 0);

预期是输出偶数 2,而 4 因为已经调用了 complete() 所以会被丢弃。

使用 verifyThenAssertThat() 可以在测试完成后继续添加断言:

@Test
public void droppedElements() {
    StepVerifier.create(source)
      .expectNext(2)
      .expectComplete()
      .verifyThenAssertThat()
      .hasDropped(4)
      .tookLessThan(Duration.ofMillis(1050));
}

4. 使用 TestPublisher 生成数据

有时候我们需要构造特定的数据来触发某些信号,或者测试自定义操作符的行为。

这时候就可以使用 *TestPublisher*,它可以让我们手动触发各种信号:

方法 说明
next(T value) 发送一个或多个 next 信号
emit(T value) 发送并自动调用 complete
complete() 发送完成信号
error(Throwable tr) 发送错误信号
flux() / mono() 转换为 Flux / Mono

4.1. 创建 TestPublisher

创建一个简单的 TestPublisher 并发送几个信号:

TestPublisher
  .<String>create()
  .next("First", "Second", "Third")
  .error(new RuntimeException("Message"));

4.2. 实战应用

假设我们有一个处理字符串转大写的类:

class UppercaseConverter {
    private final Flux<String> source;

    UppercaseConverter(Flux<String> source) {
        this.source = source;
    }

    Flux<String> getUpperCase() {
        return source.map(String::toUpperCase);
    }   
}

我们可以使用 TestPublisher 构造特定输入进行测试:

final TestPublisher<String> testPublisher = TestPublisher.create();

UppercaseConverter uppercaseConverter = new UppercaseConverter(testPublisher.flux());

StepVerifier.create(uppercaseConverter.getUpperCase())
  .then(() -> testPublisher.emit("aA", "bb", "ccc"))
  .expectNext("AA", "BB", "CCC")
  .verifyComplete();

这样就可以精确控制输入,验证复杂逻辑下的行为。

4.3. 非规范行为的 TestPublisher

我们还可以使用 createNonCompliant() 工厂方法创建“不守规矩”的 Publisher,用于测试边界情况。

TestPublisher
  .createNoncompliant(TestPublisher.Violation.ALLOW_NULL)
  .emit("1", "2", null, "3");

可用的违规选项包括:

Violation 说明
ALLOW_NULL 允许发送 null 值而不抛出 NPE
REQUEST_OVERFLOW 允许在请求数不足时仍发送 next
CLEANUP_ON_TERMINATE 允许重复发送终止信号
DEFER_CANCELLATION 忽略取消信号继续发送

5. 总结

本文介绍了如何使用 Spring Reactor 中的 StepVerifierTestPublisher 来测试响应式流。

我们学习了:

✅ 如何使用 StepVerifier 对 Publisher 进行逐步测试
✅ 如何使用 TestPublisher 构造测试数据
✅ 如何测试时间相关的 Publisher
✅ 如何验证非标准行为和后置状态

完整示例代码可在 GitHub 项目 中找到。


原始标题:Testing Reactive Streams Using StepVerifier and TestPublisher | Baeldung