1. 概述
在我们之前发布的 Project Reactor 入门 文章中,已经介绍过 Mono<T>
—— 它是一个发布类型为 T
的响应式数据流(Publisher),最多只发射一个元素。
本文将带你实战两种从 Mono
中提取出内部值 T
的方式:
✅ 阻塞式(blocking):直接获取结果,线程挂起等待
❌ 非阻塞式(non-blocking):通过回调处理结果,符合响应式编程理念
重点来了:虽然阻塞方式写起来简单粗暴,但在真正的响应式系统中属于“踩坑”行为,慎用!
2. 阻塞式提取
Mono
正常情况下会在某个时刻成功发出一个值。我们可以使用 block()
方法同步等待这个值的到来。
基础用法
Mono<String> blockingHelloWorld() {
return Mono.just("Hello world!");
}
String result = blockingHelloWorld().block();
assertEquals("Hello world!", result);
上面这段代码会一直阻塞当前线程,直到 Mono
发出值为止。问题是:你不知道它要等多久。
设置超时时间
为了防止无限等待,可以加上超时控制:
String result = blockingHelloWorld().block(Duration.of(1000, ChronoUnit.MILLIS));
assertEquals("Hello world!", result);
⚠️ 如果在 1 秒内没有收到结果,block(timeout)
会抛出 RuntimeException
。
处理空值情况
如果 Mono
是空的(比如 Mono.empty()
),调用 block()
会返回 null
,容易引发 NPE。更安全的做法是使用 blockOptional()
:
Optional<String> result = Mono.<String>empty().blockOptional();
assertEquals(Optional.empty(), result);
这样你可以用 Optional
安全地判断是否有值。
📌 小贴士:
block()
和blockOptional()
只建议在测试或与非响应式代码集成时使用。在 WebFlux 或响应式服务中滥用block()
相当于把异步变同步,直接破坏了非阻塞的优势。
3. 非阻塞式提取
这才是响应式编程的正确打开方式 —— 不阻塞线程,通过订阅(subscribe)注册回调函数来处理数据。
使用 subscribe() 注册消费者
blockingHelloWorld()
.subscribe(result -> assertEquals("Hello world!", result));
✅ 这行代码执行后会立即返回,不会阻塞当前线程。当 Mono
最终发出值时,Lambda 回调会被触发。
这就是典型的“发布-订阅”模型:你只负责订阅结果,执行流程继续往下走,完全无感等待。
在中间阶段处理数据:doOnNext
有时候你不需要最终订阅,只是想在某个阶段“观察”一下数据流,比如打日志、做监控:
blockingHelloWorld()
.doOnNext(result -> System.out.println("Received: " + result))
.subscribe();
📌 注意:
doOnNext()
是“副作用”操作(side-effect),它不会改变数据流,仅用于观察- 它必须配合
subscribe()
才会真正触发执行(冷流特性)
⚠️ 踩坑提醒:如果你只写了 doOnNext()
但忘了 subscribe()
,那什么都不会发生!因为 Mono
是懒加载的,不订阅就不执行。
4. 总结
方式 | 方法 | 是否推荐 | 适用场景 |
---|---|---|---|
阻塞式 | block() / blockOptional() |
❌ 不推荐 | 测试、调试、桥接传统同步代码 |
非阻塞式 | subscribe() + 回调 |
✅ 强烈推荐 | 生产环境、响应式服务(如 WebFlux) |
📌 核心原则:
能不用 block 就不用,能用 subscribe 就用 subscribe
响应式编程的魅力就在于“不等”,一旦你开始 block
,就等于放弃了非阻塞的全部优势。
✅ 所有示例代码均可在 GitHub 获取:https://github.com/eugenp/tutorials/tree/master/reactor-core