1. 概述

Kotlin Coroutines 能显著提升异步、回调密集型代码的可读性。

本文将深入探讨如何利用协程构建非阻塞的 Spring Boot 应用,并对比响应式编程与协程两种实现方式的差异与适用场景。

2. 为什么选择协程

现代系统常需处理成千上万甚至百万级的并发请求,因此开发领域正逐步转向非阻塞计算和请求处理。通过将 I/O 操作从核心线程中卸载,能更高效地利用系统资源,从而支撑远超传统“每个请求一个线程”模型的并发量。

异步处理本身复杂且易出错。幸运的是,我们有诸如 Java 的 CompletableFuturesRxJava 这样的响应式库来应对这种复杂性。Spring 框架也早已通过 ReactorWebFlux 提供了成熟的响应式支持。

然而,响应式代码(尤其是链式调用)往往难以阅读和调试。Kotlin 语言提供的协程(Coroutines)概念,允许开发者以近乎同步的顺序风格编写并发和异步代码,极大地提升了代码的可读性和维护性。

协程非常灵活,通过 Job 和 Scope 可以精细控制任务的执行。此外,✅ Kotlin 协程能完美地与现有的 Java 非阻塞框架(如 Reactor)协同工作

Spring 从 5.2 版本开始正式为 Kotlin 协程提供原生支持。

3. 项目搭建

首先,我们需要引入必要的依赖。

我们将使用 Netty 作为底层的异步、事件驱动的客户端-服务器框架。Netty 将作为嵌入式的响应式 Web 服务器实现。当然,由于 Servlet 3.0 规范已支持非阻塞请求处理,你也可以选择 Jetty 或 Tomcat 等 Servlet 容器。

以下是基于 Spring Boot 3.1.4 的关键依赖配置:

<properties>
    <kotlin.version>1.7.0</kotlin.version>
    <r2dbc.version>1.0.0.RELEASE</r2dbc.version>
    <r2dbc-spi.version>1.0.0.RELEASE</r2dbc-spi.version>
    <h2-r2dbc.version>1.0.0.RELEASE</h2-r2dbc.version>
    <spring-boot.version>3.1.4.RELEASE</spring-boot.version>
</properties>

⚠️ **最关键的一步:必须使用 spring-boot-starter-webflux 而非 spring-boot-starter-web**,因为我们要依赖 WebFlux 进行异步处理:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <version>${spring-boot.version}</version>
</dependency>

接下来,添加 R2DBC 依赖以支持响应式数据库访问:

<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-h2</artifactId>
    <version>${h2-r2dbc.version}</version>
</dependency>
<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-spi</artifactId>
    <version>${r2dbc-spi.version}</version>
</dependency>

最后,添加 Kotlin 核心及协程相关依赖:

<dependency>
    <groupId>org.jetbrains.kotlin</groupId>
    <artifactId>kotlin-reflect</artifactId>
</dependency>
<dependency>
    <groupId>org.jetbrains.kotlin</groupId>
    <artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
<dependency>
    <groupId>org.jetbrains.kotlinx</groupId>
    <artifactId>kotlinx-coroutines-core</artifactId>
</dependency>
<dependency>
    <groupId>org.jetbrains.kotlinx</groupId>
    <artifactId>kotlinx-coroutines-reactor</artifactId>
</dependency>

4. Spring Data R2DBC 与协程

本节重点介绍如何以响应式和协程两种风格访问数据库。

4.1. 响应式 R2DBC

R2DBC 是一个 API 规范,定义了由数据库厂商实现的响应式 API。我们的数据存储将使用内存中的 H2 database,同时 PostgreSQL 和 SQL Server 也有对应的响应式驱动。

首先,使用响应式方式实现一个简单的仓库(Repository):

@Repository
class ProductRepository(private val client: DatabaseClient) {

    fun getProductById(id: Int): Mono<Product> {
        return client.execute()
          .sql("SELECT * FROM products WHERE id = $1")
          .bind(0, id)
          .`as`(Product::class.java)
          .fetch()
          .one()
    }

    fun addNewProduct(name: String, price: Float): Mono<Void> {
        return client.execute()
          .sql("INSERT INTO products (name, price) VALUES($1, $2)")
          .bind(0, name)
          .bind(1, price)
          .then()
    }

    fun getAllProducts(): Flux<Product> {
        return client.select()
          .from("products")
          .`as`(Product::class.java)
          .fetch()
          .all()
    }
}

这里使用了非阻塞的 DatabaseClient 执行数据库查询。接下来,我们将其重构为协程风格。

4.2. R2DBC 与协程

要将响应式函数转换为协程 API,核心是使用 suspend 修饰符并调整返回类型:

  • 无返回值函数:去掉 Mono<Void>,直接声明为 suspend fun
  • 单个结果函数:将 Mono<T> 替换为 T?(或 T),移除包装。
  • 多个结果函数:将 Flux<T> 替换为 Flow<T>

应用这些规则进行重构:

@Repository
class ProductRepositoryCoroutines(private val client: DatabaseClient) {

    suspend fun getProductById(id: Int): Product? =
        client.execute()
          .sql("SELECT * FROM products WHERE id = $1")
          .bind(0, id)
          .`as`(Product::class.java)
          .fetch()
          .one()
          .awaitFirstOrNull()

    suspend fun addNewProduct(name: String, price: Float) =
        client.execute()
          .sql("INSERT INTO products (name, price) VALUES($1, $2)")
          .bind(0, name)
          .bind(1, price)
          .then()
          .awaitFirstOrNull()

    @FlowPreview
    fun getAllProducts(): Flow<Product> =
        client.select()
          .from("products")
          .`as`(Product::class.java)
          .fetch()
          .all()
          .asFlow()
}

需要注意以下几点:

  • awaitFirstOrNull() 等扩展函数来自 kotlin-coroutines-reactive 库。
  • 更多有用的扩展可以在 spring-data-r2dbc 中找到。

5. Spring WebFlux 控制器

现在我们有了仓库层,接下来创建非阻塞的控制器(Controller)。

5.1. 响应式控制器

先看熟悉的响应式风格:

@RestController
class ProductController {
    @Autowired
    lateinit var productRepository: ProductRepository

    @GetMapping("/{id}")
    fun findOne(@PathVariable id: Int): Mono<Product> {
        return productRepository.getProductById(id)
    }

    @GetMapping("/")
    fun findAll(): Flux<Product> {
        return productRepository.getAllProducts()
    }
}

关键问题:实际的 I/O 操作在哪个线程上执行? 默认情况下,每个查询操作都在 Reactor 的 NIO 线程池中的某个线程上运行,由底层调度器(Scheduler)决定。

5.2. 协程控制器

使用协程重构控制器:

@RestController
class ProductControllerCoroutines {
    @Autowired
    lateinit var productRepository: ProductRepositoryCoroutines

    @GetMapping("/{id}")
    suspend fun findOne(@PathVariable id: Int): Product? {
        return productRepository.getProductById(id)
    }

    @FlowPreview
    @GetMapping("/")
    fun findAll(): Flow<Product> {
        return productRepository.getAllProducts()
    }
}

注意:

  • findAll() 返回 Flow,它内部会调用挂起函数,但自身不是挂起函数。
  • 数据库查询仍在相同的 Reactor NIO 线程上执行。

6. Spring WebFlux WebClient

假设系统中存在微服务架构,需要调用其他服务获取数据(例如商品库存)。

6.1. 响应式 WebClient

使用 WebClient 发起请求:

@GetMapping("/{id}/stock")
fun findOneInStock(@PathVariable id: Int): Mono<ProductStockView> {
   val product = productRepository.getProductById(id)
   
   val stockQuantity = webClient.get()
     .uri("/stock-service/product/$id/quantity")
     .accept(MediaType.APPLICATION_JSON)
     .retrieve()
     .bodyToMono<Int>()
   return product.zipWith(stockQuantity) { 
       productInStock, stockQty ->
         ProductStockView(productInStock, stockQty)
   }
}

使用 zipWith 组合两个 Mono,等待两者完成后再合并结果。

6.2. WebClient 与协程

使用协程时,通过 awaitBody() 扩展函数发起请求:

@GetMapping("/{id}/stock")
suspend fun findOneInStock(@PathVariable id: Int): ProductStockView {
    val product = productRepository.getProductById(id)
    val quantity = webClient.get()
      .uri("/stock-service/product/$id/quantity")
      .accept(APPLICATION_JSON)
      .retrieve()
      .awaitBody<Int>()
    return ProductStockView(product!!, quantity)
}

这看起来像同步代码,但实际上是异步的。这是协程带来的巨大可读性优势。

⚠️ 踩坑点:上述代码中,数据库查询和 Web 请求是串行执行的!因为协程默认是顺序的。

如何并行执行?使用 async 构建并行分解:

@GetMapping("/{id}/stock")
suspend fun findOneInStock(@PathVariable id: Int): ProductStockView = coroutineScope {
    val product: Deferred<Product?> = async(start = CoroutineStart.LAZY) {
        productRepository.getProductById(id)
    }
    val quantity: Deferred<Int> = async(start = CoroutineStart.LAZY) {
        webClient.get()
          .uri("/stock-service/product/$id/quantity")
          .accept(APPLICATION_JSON)
          .retrieve().awaitBody<Int>()
    }
    product.start()
    quantity.start()
    ProductStockView(product.await()!!, quantity.await())
}

要点:

  • async 创建 Deferred 对象,代表一个异步计算。
  • 使用 CoroutineStart.LAZY 防止立即启动。
  • 显式调用 start() 启动任务,实现并行。
  • coroutineScope 确保所有子协程完成后才退出,且不阻塞当前线程。

底层机制:async 块内的函数被分发到独立的工作线程,而真正的 I/O 操作仍在 Reactor 的 NIO 线程池中执行。

7. WebFlux.fn DSL 路由

最后,看看如何在 WebFlux 的函数式 DSL 路由中使用协程。

Spring 提供了 coRouter {} DSL 来定义协程友好的路由:

@Configuration
class RouterConfiguration {
    @FlowPreview
    @Bean
    fun productRoutes(productsHandler: ProductsHandler) = coRouter {
        GET("/", productsHandler::findAll)
        GET("/{id}", productsHandler::findOne)
        GET("/{id}/stock", productsHandler::findOneInStock)
    }
}

实现对应的 ProductsHandler

@Component
class ProductsHandler(
  @Autowired var webClient: WebClient, 
  @Autowired var productRepository: ProductRepositoryCoroutines) {
    
    @FlowPreview
    suspend fun findAll(request: ServerRequest): ServerResponse =
        ServerResponse.ok().json().bodyAndAwait(productRepository.getAllProducts())

    suspend fun findOneInStock(request: ServerRequest): ServerResponse {
        val id = request.pathVariable("id").toInt()
        val product: Deferred<Product?> = GlobalScope.async {
            productRepository.getProductById(id)
        }
        val quantity: Deferred<Int> = GlobalScope.async {
            webClient.get()
              .uri("/stock-service/product/$id/quantity")
              .accept(MediaType.APPLICATION_JSON)
              .retrieve().awaitBody<Int>()
        }
        return ServerResponse.ok()
          .json()
          .bodyValueAndAwait(ProductStockView(product.await()!!, quantity.await()))
    }

    suspend fun findOne(request: ServerRequest): ServerResponse {
        val id = request.pathVariable("id").toInt()
        return productRepository.getProductById(id)?.let 
          { ServerResponse.ok().json().bodyValueAndAwait(it) }
          ?: ServerResponse.notFound().buildAndAwait()
    }
}

与 Controller 相比,主要区别在于参数和返回类型。使用 DSL + 协程,可以写出非常流畅简洁的路由定义。

8. 总结

本文深入探讨了 如何将 Kotlin 协程与 Spring 生态(特别是 R2DBC 和 WebFlux)集成

采用非阻塞性能方案能显著提升应用的性能和可扩展性。更重要的是,✅ Kotlin 协程让复杂的异步代码变得直观、易于理解和维护,减少了回调地狱和响应式链式调用的认知负担。

⚠️ 注意:文中涉及的库在中期版本迭代中可能变化较大,不同小版本间可能存在兼容性问题,请密切关注官方文档。

示例代码一如既往地托管在 GitHub 上。


原始标题:Non-Blocking Spring Boot with Kotlin Coroutines