1. 概述
在本教程中,我们将深入探讨如何使用 StepVerifier 和 TestPublisher 来测试 响应式流。
我们将基于一个包含响应式操作链的 Spring Reactor 应用进行演示。
2. Maven 依赖
Spring Reactor 提供了多个用于测试响应式流的类。
我们可以通过添加 reactor-test 依赖来获取这些测试工具:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
<version>3.6.0</version>
</dependency>
3. StepVerifier
reactor-test
主要用于两个方面:
✅ 创建基于步骤的测试场景(StepVerifier)
✅ 使用 TestPublisher 生成预定义数据,以测试下游操作符的行为
最常见的测试场景是:我们有一个已定义好的 Publisher(Flux 或 Mono),想知道当有人订阅它时它的行为是什么样的。
通过 StepVerifier API,我们可以按步骤定义对发布元素的期望值,包括:
- 预期接收到哪些元素
- 流完成时会发生什么
首先,我们创建一个带有一些操作符的 Publisher:
Flux<String> source = Flux.just("John", "Monica", "Mark", "Cloe", "Frank", "Casper", "Olivia", "Emily", "Cate")
.filter(name -> name.length() == 4)
.map(String::toUpperCase);
这个例子中我们过滤出长度为4的名字,并将其转换为大写。
3.1. 逐步验证场景
现在我们使用 StepVerifier 来测试 source
的行为:
StepVerifier
.create(source)
.expectNext("JOHN")
.expectNextMatches(name -> name.startsWith("MA"))
.expectNext("CLOE", "CATE")
.expectComplete()
.verify();
流程如下:
- 使用
create()
方法构建 StepVerifier 实例 - 传入待测的 Flux
- 使用
expectNext()
验证发出的第一个信号,也可以传多个元素进去 - 使用
expectNextMatches(Predicate)
自定义匹配逻辑 - 最后使用
expectComplete()
表示流应正常结束 - 调用
verify()
启动整个测试过程
⚠️ 注意:每个 expectXXX()
方法都代表一个断言步骤,必须严格按照顺序执行。
3.2. 异常处理测试
接下来,我们把 source
和一个会立即抛出异常的 Mono 连接起来:
Flux<String> error = source.concatWith(
Mono.error(new IllegalArgumentException("Our message"))
);
测试代码如下:
StepVerifier
.create(error)
.expectNextCount(4)
.expectErrorMatches(throwable -> throwable instanceof IllegalArgumentException &&
throwable.getMessage().equals("Our message")
).verify();
这里的关键点是:
✅ 只能有一个异常相关的断言方法(比如 expectErrorMatches
)
❌ 一旦出现错误信号(OnError),流就关闭了,不能再继续添加其他断言
如果不需要同时检查类型和消息内容,可以使用以下专用方法:
方法 | 描述 |
---|---|
expectError() |
任意异常 |
expectError(Class<? extends Throwable>) |
指定类型的异常 |
expectErrorMessage(String) |
指定错误信息 |
expectErrorMatches(Predicate<Throwable>) |
匹配自定义条件 |
expectErrorSatisfies(Consumer<Throwable>) |
自定义断言 |
3.3. 时间相关 Publisher 的测试
有些 Publisher 是基于时间的。例如,两个事件之间可能间隔一天。
显然我们不能等整整一天才跑完测试。
为此,我们可以使用 StepVerifier.withVirtualTime()
构造器:
StepVerifier
.withVirtualTime(() -> Flux.interval(Duration.ofSeconds(1)).take(2))
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(0L)
.thenAwait(Duration.ofSeconds(1))
.expectNext(1L)
.verifyComplete();
📌 关键要点:
- 不要提前实例化 Flux,而是应该在 Supplier 中懒加载创建
expectNoEvent(Duration)
必须在expectSubscription()
之后调用thenAwait(Duration)
是暂停当前验证线程,等待新事件到来
时间相关的主要断言方法:
方法 | 作用 |
---|---|
thenAwait(Duration) |
等待指定时间,期间允许事件发生 |
expectNoEvent(Duration) |
在指定时间内不应有任何事件发生 |
3.4. 测试后置断言
有时我们需要在完整测试流程结束后再做一些额外的状态校验。
比如我们自定义一个 Publisher:
Flux<Integer> source = Flux.<Integer>create(emitter -> {
emitter.next(1);
emitter.next(2);
emitter.next(3);
emitter.complete();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
emitter.next(4); // 会被丢弃
}).filter(number -> number % 2 == 0);
预期是输出偶数 2
,而 4
因为已经调用了 complete()
所以会被丢弃。
使用 verifyThenAssertThat()
可以在测试完成后继续添加断言:
@Test
public void droppedElements() {
StepVerifier.create(source)
.expectNext(2)
.expectComplete()
.verifyThenAssertThat()
.hasDropped(4)
.tookLessThan(Duration.ofMillis(1050));
}
4. 使用 TestPublisher 生成数据
有时候我们需要构造特定的数据来触发某些信号,或者测试自定义操作符的行为。
这时候就可以使用 *TestPublisher
方法 | 说明 |
---|---|
next(T value) |
发送一个或多个 next 信号 |
emit(T value) |
发送并自动调用 complete |
complete() |
发送完成信号 |
error(Throwable tr) |
发送错误信号 |
flux() / mono() |
转换为 Flux / Mono |
4.1. 创建 TestPublisher
创建一个简单的 TestPublisher 并发送几个信号:
TestPublisher
.<String>create()
.next("First", "Second", "Third")
.error(new RuntimeException("Message"));
4.2. 实战应用
假设我们有一个处理字符串转大写的类:
class UppercaseConverter {
private final Flux<String> source;
UppercaseConverter(Flux<String> source) {
this.source = source;
}
Flux<String> getUpperCase() {
return source.map(String::toUpperCase);
}
}
我们可以使用 TestPublisher 构造特定输入进行测试:
final TestPublisher<String> testPublisher = TestPublisher.create();
UppercaseConverter uppercaseConverter = new UppercaseConverter(testPublisher.flux());
StepVerifier.create(uppercaseConverter.getUpperCase())
.then(() -> testPublisher.emit("aA", "bb", "ccc"))
.expectNext("AA", "BB", "CCC")
.verifyComplete();
这样就可以精确控制输入,验证复杂逻辑下的行为。
4.3. 非规范行为的 TestPublisher
我们还可以使用 createNonCompliant()
工厂方法创建“不守规矩”的 Publisher,用于测试边界情况。
TestPublisher
.createNoncompliant(TestPublisher.Violation.ALLOW_NULL)
.emit("1", "2", null, "3");
可用的违规选项包括:
Violation | 说明 |
---|---|
ALLOW_NULL |
允许发送 null 值而不抛出 NPE |
REQUEST_OVERFLOW |
允许在请求数不足时仍发送 next |
CLEANUP_ON_TERMINATE |
允许重复发送终止信号 |
DEFER_CANCELLATION |
忽略取消信号继续发送 |
5. 总结
本文介绍了如何使用 Spring Reactor 中的 StepVerifier 和 TestPublisher 来测试响应式流。
我们学习了:
✅ 如何使用 StepVerifier 对 Publisher 进行逐步测试
✅ 如何使用 TestPublisher 构造测试数据
✅ 如何测试时间相关的 Publisher
✅ 如何验证非标准行为和后置状态
完整示例代码可在 GitHub 项目 中找到。