1. 概述

在我们之前发布的 Project Reactor 入门 文章中,已经介绍过 Mono<T> —— 它是一个发布类型为 T 的响应式数据流(Publisher),最多只发射一个元素。

本文将带你实战两种从 Mono 中提取出内部值 T 的方式:
阻塞式(blocking):直接获取结果,线程挂起等待
非阻塞式(non-blocking):通过回调处理结果,符合响应式编程理念

重点来了:虽然阻塞方式写起来简单粗暴,但在真正的响应式系统中属于“踩坑”行为,慎用!


2. 阻塞式提取

Mono 正常情况下会在某个时刻成功发出一个值。我们可以使用 block() 方法同步等待这个值的到来。

基础用法

Mono<String> blockingHelloWorld() {
    return Mono.just("Hello world!");
}

String result = blockingHelloWorld().block();
assertEquals("Hello world!", result);

上面这段代码会一直阻塞当前线程,直到 Mono 发出值为止。问题是:你不知道它要等多久。

设置超时时间

为了防止无限等待,可以加上超时控制:

String result = blockingHelloWorld().block(Duration.of(1000, ChronoUnit.MILLIS));
assertEquals("Hello world!", result);

⚠️ 如果在 1 秒内没有收到结果,block(timeout) 会抛出 RuntimeException

处理空值情况

如果 Mono 是空的(比如 Mono.empty()),调用 block() 会返回 null,容易引发 NPE。更安全的做法是使用 blockOptional()

Optional<String> result = Mono.<String>empty().blockOptional();
assertEquals(Optional.empty(), result);

这样你可以用 Optional 安全地判断是否有值。

📌 小贴士:block()blockOptional() 只建议在测试或与非响应式代码集成时使用。在 WebFlux 或响应式服务中滥用 block() 相当于把异步变同步,直接破坏了非阻塞的优势。


3. 非阻塞式提取

这才是响应式编程的正确打开方式 —— 不阻塞线程,通过订阅(subscribe)注册回调函数来处理数据。

使用 subscribe() 注册消费者

blockingHelloWorld()
  .subscribe(result -> assertEquals("Hello world!", result));

✅ 这行代码执行后会立即返回,不会阻塞当前线程。当 Mono 最终发出值时,Lambda 回调会被触发。

这就是典型的“发布-订阅”模型:你只负责订阅结果,执行流程继续往下走,完全无感等待。

在中间阶段处理数据:doOnNext

有时候你不需要最终订阅,只是想在某个阶段“观察”一下数据流,比如打日志、做监控:

blockingHelloWorld()
  .doOnNext(result -> System.out.println("Received: " + result))
  .subscribe();

📌 注意:

  • doOnNext() 是“副作用”操作(side-effect),它不会改变数据流,仅用于观察
  • 它必须配合 subscribe() 才会真正触发执行(冷流特性)

⚠️ 踩坑提醒:如果你只写了 doOnNext() 但忘了 subscribe(),那什么都不会发生!因为 Mono 是懒加载的,不订阅就不执行。


4. 总结

方式 方法 是否推荐 适用场景
阻塞式 block() / blockOptional() ❌ 不推荐 测试、调试、桥接传统同步代码
非阻塞式 subscribe() + 回调 ✅ 强烈推荐 生产环境、响应式服务(如 WebFlux)

📌 核心原则:

能不用 block 就不用,能用 subscribe 就用 subscribe

响应式编程的魅力就在于“不等”,一旦你开始 block,就等于放弃了非阻塞的全部优势。


✅ 所有示例代码均可在 GitHub 获取:https://github.com/eugenp/tutorials/tree/master/reactor-core


原始标题:How to Extract a Mono’s Content in Java