1. 引言

本文将探讨 Spring Webflux 开发中一个常见错误:java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blockingSpring Webflux 是一个非阻塞 Web 框架,专为充分利用多核新一代处理器和处理大规模并发连接而设计。

由于是非阻塞框架,线程绝不能被阻塞。接下来我们深入分析这个问题。

2. Spring Webflux 线程模型

要理解这个问题,需要先了解 Spring Webflux 的线程模型

在 Spring Webflux 中,少量工作线程池处理所有传入请求。这与 Servlet 模型(每个请求分配独立线程)形成对比。因此框架特别保护这些请求接收线程的执行环境。

基于这个理解,我们聚焦到本文的核心问题。

3. 理解线程阻塞导致的 IllegalStateException

通过示例说明何时何地会出现 java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread 错误。

以文件搜索 API 为例:该 API 从文件系统读取文件,并在文件中搜索用户提供的文本。

3.1. 文件服务

首先定义 FileService 类,以字符串形式读取文件内容:

@Service
public class FileService {
    @Value("${files.base.dir:/tmp/bael-7724}")
    private String filesBaseDir;

    public Mono<String> getFileContentAsString(String fileName) {
        return DataBufferUtils.read(Paths.get(filesBaseDir + "/" + fileName), DefaultDataBufferFactory.sharedInstance, DefaultDataBufferFactory.DEFAULT_INITIAL_CAPACITY)
          .map(dataBuffer -> dataBuffer.toString(StandardCharsets.UTF_8))
          .reduceWith(StringBuilder::new, StringBuilder::append)
          .map(StringBuilder::toString);
    }
}

注意:FileService响应式(异步)方式读取文件。

3.2. 文件内容搜索服务

利用 FileService 编写文件搜索服务:

@Service
public class FileContentSearchService {
    @Autowired
    private FileService fileService;

    public Mono<Boolean> blockingSearch(String fileName, String searchTerm) {
        String fileContent = fileService
          .getFileContentAsString(fileName)
          .doOnNext(content -> ThreadLogger.log("1. BlockingSearch"))
          .block();

        boolean isSearchTermPresent = fileContent.contains(searchTerm);

        return Mono.just(isSearchTermPresent);
    }
}

该服务根据文件中是否找到搜索词返回布尔值。调用 getFileContentAsString() 后,因结果是异步的 Mono<String>,我们调用 block() 获取字符串值。最后用 Mono 包装结果返回。

3.3. 文件控制器

FileController 使用 FileContentSearchServiceblockingSearch() 方法:

@RestController
@RequestMapping("bael7724/v1/files")
public class FileController {
    ...
    @GetMapping(value = "/{name}/blocking-search")
    Mono<Boolean> blockingSearch(@PathVariable("name") String fileName, @RequestParam String term) {
        return fileContentSearchService.blockingSearch(fileName, term);
    }
}

3.4. 复现异常

控制器调用服务层方法,而服务层调用了 block()。由于在请求接收线程上执行,调用 API 时会触发目标异常:

12:28:51.610 [reactor-http-epoll-2] ERROR o.s.b.a.w.r.e.AbstractErrorWebExceptionHandler - [ea98e542-1]  500 Server Error for HTTP GET "/bael7724/v1/files/a/blocking-search?term=a"
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-2
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    *__checkpoint ⇢ com.baeldung.filters.TraceWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ com.baeldung.filters.ExceptionalTraceFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ HTTP GET "/bael7724/v1/files/a/blocking-search?term=a" [ExceptionHandlingWebHandler]
Original Stack Trace:
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
    at reactor.core.publisher.Mono.block(Mono.java:1712)
    at com.baeldung.bael7724.service.FileContentSearchService.blockingSearch(FileContentSearchService.java:20)
    at com.baeldung.bael7724.controller.FileController.blockingSearch(FileController.java:35)
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
    at java.base/jdk.internal.reflect.Method.invoke(Method.java:580)

3.5. 根本原因

异常根源在于**在请求接收线程上调用了 block()**。示例代码中,block() 在请求接收线程池的某个线程上执行。具体来说,是在标记为"仅非阻塞操作"的线程(如 Schedulers.parallel() 启动的线程)上调用,这些线程实现了 Reactor 的 NonBlocking 标记接口。

4. 解决方案

4.1. 拥抱响应式操作

最佳实践是使用响应式操作替代 block()。用 map() 操作将 String 转换为 Boolean

public Mono<Boolean> nonBlockingSearch(String fileName, String searchTerm) {
    return fileService.getFileContentAsString(fileName)
      .doOnNext(content -> ThreadLogger.log("1. NonBlockingSearch"))
      .map(content -> content.contains(searchTerm))
      .doOnNext(content -> ThreadLogger.log("2. NonBlockingSearch"));
}

这样完全消除了 block() 的需求。运行时线程上下文如下:

[1. NonBlockingSearch] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506215299Z
[2. NonBlockingSearch] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506361786Z
[1. In Controller] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506465805Z
[2. In Controller] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506543145Z

日志显示所有操作在同一个请求接收线程池执行。

⚠️ 注意:即使没有异常,I/O 操作(如文件读取)仍建议在独立线程池执行

4.2. 在有界弹性线程池中阻塞

如果无法避免 block(),需切换线程池。Spring Webflux 提供 publishOn()subscribeOn() 切换线程:

public Mono<Boolean> workableBlockingSearch(String fileName, String searchTerm) {
    return Mono.just("")
      .doOnNext(s -> ThreadLogger.log("1. WorkableBlockingSearch"))
      .publishOn(Schedulers.boundedElastic())
      .doOnNext(s -> ThreadLogger.log("2. WorkableBlockingSearch"))
      .map(s -> fileService.getFileContentAsString(fileName)
        .block()
        .contains(searchTerm))
      .doOnNext(s -> ThreadLogger.log("3. WorkableBlockingSearch"));
}

使用 publishOn() 切换后续操作的线程池(不影响订阅和上游操作)。切换到有界弹性线程池后,可安全调用 block()

运行时线程上下文:

[1. WorkableBlockingSearch] ThreadName: parallel-2, Time: 2024-06-17T07:40:59.440562518Z
[2. WorkableBlockingSearch] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.442161018Z
[3. WorkableBlockingSearch] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.442891230Z
[1. In Controller] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.443058091Z
[2. In Controller] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.443181770Z

从步骤 2 开始的操作都在有界弹性线程池执行,因此未触发异常。

4.3. 注意事项

使用 block() 时需注意以下陷阱:

❌ 错误示例:线程切换位置不当

public Mono<Boolean> incorrectUseOfSchedulersSearch(String fileName, String searchTerm) {
    String fileContent = fileService.getFileContentAsString(fileName)
      .doOnNext(content -> ThreadLogger.log("1. IncorrectUseOfSchedulersSearch"))
      .publishOn(Schedulers.boundedElastic())
      .doOnNext(content -> ThreadLogger.log("2. IncorrectUseOfSchedulersSearch"))
      .block();

    boolean isSearchTermPresent = fileContent.contains(searchTerm);

    return Mono.just(isSearchTermPresent);
}

即使使用了 publishOn(),仍会触发异常:

[1. IncorrectUseOfSchedulersSearch] ThreadName: Thread-4, Time: 2024-06-17T08:57:02.490298417Z
[2. IncorrectUseOfSchedulersSearch] ThreadName: boundedElastic-1, Time: 2024-06-17T08:57:02.491870410Z
14:27:02.495 [parallel-1] ERROR o.s.b.a.w.r.e.AbstractErrorWebExceptionHandler - [53e4bce1]  500 Server Error for HTTP GET "/bael7724/v1/files/robots.txt/incorrect-use-of-schedulers-search?term=r-"
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-1
    ...(同上异常堆栈)

原因:block() 仍在原始请求接收线程池执行。

⚠️ 禁止使用的线程池

即使切换线程池,不能使用并行线程池Schedulers.parallel()),因其线程同样禁止阻塞操作。

✅ 替代方案

Schedulers.boundedElastic() 外,还可通过 Schedulers.fromExecutorService() 使用自定义线程池。

5. 结论

在 Spring Webflux 中解决 IllegalStateException 的关键点:

  1. 首选响应式操作:使用 map() 等操作符避免 block(),保持非阻塞特性
  2. 必要时切换线程池:若必须使用 block(),通过 publishOn(Schedulers.boundedElastic()) 切换到有界弹性线程池
  3. 规避陷阱
    • 确保 block() 在切换后的线程池执行
    • 避免在并行线程池等禁止阻塞的线程上操作
    • 可使用自定义线程池替代默认有界弹性池

正确处理线程上下文切换,既能避免异常,又能维持应用的稳定性和高性能。

本文源码可在 GitHub 获取。


原始标题:How to Solve “java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking” | Baeldung