1. 概述
Kotlin Coroutines 能显著提升异步、回调密集型代码的可读性。
本文将深入探讨如何利用协程构建非阻塞的 Spring Boot 应用,并对比响应式编程与协程两种实现方式的差异与适用场景。
2. 为什么选择协程
现代系统常需处理成千上万甚至百万级的并发请求,因此开发领域正逐步转向非阻塞计算和请求处理。通过将 I/O 操作从核心线程中卸载,能更高效地利用系统资源,从而支撑远超传统“每个请求一个线程”模型的并发量。
异步处理本身复杂且易出错。幸运的是,我们有诸如 Java 的 CompletableFutures 或 RxJava 这样的响应式库来应对这种复杂性。Spring 框架也早已通过 Reactor 和 WebFlux 提供了成熟的响应式支持。
然而,响应式代码(尤其是链式调用)往往难以阅读和调试。Kotlin 语言提供的协程(Coroutines)概念,允许开发者以近乎同步的顺序风格编写并发和异步代码,极大地提升了代码的可读性和维护性。
协程非常灵活,通过 Job 和 Scope 可以精细控制任务的执行。此外,✅ Kotlin 协程能完美地与现有的 Java 非阻塞框架(如 Reactor)协同工作。
Spring 从 5.2 版本开始正式为 Kotlin 协程提供原生支持。
3. 项目搭建
首先,我们需要引入必要的依赖。
我们将使用 Netty 作为底层的异步、事件驱动的客户端-服务器框架。Netty 将作为嵌入式的响应式 Web 服务器实现。当然,由于 Servlet 3.0 规范已支持非阻塞请求处理,你也可以选择 Jetty 或 Tomcat 等 Servlet 容器。
以下是基于 Spring Boot 3.1.4 的关键依赖配置:
<properties>
<kotlin.version>1.7.0</kotlin.version>
<r2dbc.version>1.0.0.RELEASE</r2dbc.version>
<r2dbc-spi.version>1.0.0.RELEASE</r2dbc-spi.version>
<h2-r2dbc.version>1.0.0.RELEASE</h2-r2dbc.version>
<spring-boot.version>3.1.4.RELEASE</spring-boot.version>
</properties>
⚠️ **最关键的一步:必须使用 spring-boot-starter-webflux
而非 spring-boot-starter-web
**,因为我们要依赖 WebFlux 进行异步处理:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>${spring-boot.version}</version>
</dependency>
接下来,添加 R2DBC 依赖以支持响应式数据库访问:
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-h2</artifactId>
<version>${h2-r2dbc.version}</version>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-spi</artifactId>
<version>${r2dbc-spi.version}</version>
</dependency>
最后,添加 Kotlin 核心及协程相关依赖:
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-reflect</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-core</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-reactor</artifactId>
</dependency>
4. Spring Data R2DBC 与协程
本节重点介绍如何以响应式和协程两种风格访问数据库。
4.1. 响应式 R2DBC
R2DBC 是一个 API 规范,定义了由数据库厂商实现的响应式 API。我们的数据存储将使用内存中的 H2 database,同时 PostgreSQL 和 SQL Server 也有对应的响应式驱动。
首先,使用响应式方式实现一个简单的仓库(Repository):
@Repository
class ProductRepository(private val client: DatabaseClient) {
fun getProductById(id: Int): Mono<Product> {
return client.execute()
.sql("SELECT * FROM products WHERE id = $1")
.bind(0, id)
.`as`(Product::class.java)
.fetch()
.one()
}
fun addNewProduct(name: String, price: Float): Mono<Void> {
return client.execute()
.sql("INSERT INTO products (name, price) VALUES($1, $2)")
.bind(0, name)
.bind(1, price)
.then()
}
fun getAllProducts(): Flux<Product> {
return client.select()
.from("products")
.`as`(Product::class.java)
.fetch()
.all()
}
}
这里使用了非阻塞的 DatabaseClient
执行数据库查询。接下来,我们将其重构为协程风格。
4.2. R2DBC 与协程
要将响应式函数转换为协程 API,核心是使用 suspend
修饰符并调整返回类型:
- 无返回值函数:去掉
Mono<Void>
,直接声明为suspend fun
。 - 单个结果函数:将
Mono<T>
替换为T?
(或T
),移除包装。 - 多个结果函数:将
Flux<T>
替换为Flow<T>
。
应用这些规则进行重构:
@Repository
class ProductRepositoryCoroutines(private val client: DatabaseClient) {
suspend fun getProductById(id: Int): Product? =
client.execute()
.sql("SELECT * FROM products WHERE id = $1")
.bind(0, id)
.`as`(Product::class.java)
.fetch()
.one()
.awaitFirstOrNull()
suspend fun addNewProduct(name: String, price: Float) =
client.execute()
.sql("INSERT INTO products (name, price) VALUES($1, $2)")
.bind(0, name)
.bind(1, price)
.then()
.awaitFirstOrNull()
@FlowPreview
fun getAllProducts(): Flow<Product> =
client.select()
.from("products")
.`as`(Product::class.java)
.fetch()
.all()
.asFlow()
}
需要注意以下几点:
-
awaitFirstOrNull()
等扩展函数来自kotlin-coroutines-reactive
库。 - 更多有用的扩展可以在 spring-data-r2dbc 库 中找到。
5. Spring WebFlux 控制器
现在我们有了仓库层,接下来创建非阻塞的控制器(Controller)。
5.1. 响应式控制器
先看熟悉的响应式风格:
@RestController
class ProductController {
@Autowired
lateinit var productRepository: ProductRepository
@GetMapping("/{id}")
fun findOne(@PathVariable id: Int): Mono<Product> {
return productRepository.getProductById(id)
}
@GetMapping("/")
fun findAll(): Flux<Product> {
return productRepository.getAllProducts()
}
}
关键问题:实际的 I/O 操作在哪个线程上执行? 默认情况下,每个查询操作都在 Reactor 的 NIO 线程池中的某个线程上运行,由底层调度器(Scheduler)决定。
5.2. 协程控制器
使用协程重构控制器:
@RestController
class ProductControllerCoroutines {
@Autowired
lateinit var productRepository: ProductRepositoryCoroutines
@GetMapping("/{id}")
suspend fun findOne(@PathVariable id: Int): Product? {
return productRepository.getProductById(id)
}
@FlowPreview
@GetMapping("/")
fun findAll(): Flow<Product> {
return productRepository.getAllProducts()
}
}
注意:
-
findAll()
返回Flow
,它内部会调用挂起函数,但自身不是挂起函数。 - 数据库查询仍在相同的 Reactor NIO 线程上执行。
6. Spring WebFlux WebClient
假设系统中存在微服务架构,需要调用其他服务获取数据(例如商品库存)。
6.1. 响应式 WebClient
使用 WebClient 发起请求:
@GetMapping("/{id}/stock")
fun findOneInStock(@PathVariable id: Int): Mono<ProductStockView> {
val product = productRepository.getProductById(id)
val stockQuantity = webClient.get()
.uri("/stock-service/product/$id/quantity")
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono<Int>()
return product.zipWith(stockQuantity) {
productInStock, stockQty ->
ProductStockView(productInStock, stockQty)
}
}
使用 zipWith
组合两个 Mono
,等待两者完成后再合并结果。
6.2. WebClient 与协程
使用协程时,通过 awaitBody()
扩展函数发起请求:
@GetMapping("/{id}/stock")
suspend fun findOneInStock(@PathVariable id: Int): ProductStockView {
val product = productRepository.getProductById(id)
val quantity = webClient.get()
.uri("/stock-service/product/$id/quantity")
.accept(APPLICATION_JSON)
.retrieve()
.awaitBody<Int>()
return ProductStockView(product!!, quantity)
}
这看起来像同步代码,但实际上是异步的。这是协程带来的巨大可读性优势。
⚠️ 踩坑点:上述代码中,数据库查询和 Web 请求是串行执行的!因为协程默认是顺序的。
如何并行执行?使用 async
构建并行分解:
@GetMapping("/{id}/stock")
suspend fun findOneInStock(@PathVariable id: Int): ProductStockView = coroutineScope {
val product: Deferred<Product?> = async(start = CoroutineStart.LAZY) {
productRepository.getProductById(id)
}
val quantity: Deferred<Int> = async(start = CoroutineStart.LAZY) {
webClient.get()
.uri("/stock-service/product/$id/quantity")
.accept(APPLICATION_JSON)
.retrieve().awaitBody<Int>()
}
product.start()
quantity.start()
ProductStockView(product.await()!!, quantity.await())
}
要点:
-
async
创建Deferred
对象,代表一个异步计算。 - 使用
CoroutineStart.LAZY
防止立即启动。 - 显式调用
start()
启动任务,实现并行。 -
coroutineScope
确保所有子协程完成后才退出,且不阻塞当前线程。
底层机制:async
块内的函数被分发到独立的工作线程,而真正的 I/O 操作仍在 Reactor 的 NIO 线程池中执行。
7. WebFlux.fn DSL 路由
最后,看看如何在 WebFlux 的函数式 DSL 路由中使用协程。
Spring 提供了 coRouter {}
DSL 来定义协程友好的路由:
@Configuration
class RouterConfiguration {
@FlowPreview
@Bean
fun productRoutes(productsHandler: ProductsHandler) = coRouter {
GET("/", productsHandler::findAll)
GET("/{id}", productsHandler::findOne)
GET("/{id}/stock", productsHandler::findOneInStock)
}
}
实现对应的 ProductsHandler
:
@Component
class ProductsHandler(
@Autowired var webClient: WebClient,
@Autowired var productRepository: ProductRepositoryCoroutines) {
@FlowPreview
suspend fun findAll(request: ServerRequest): ServerResponse =
ServerResponse.ok().json().bodyAndAwait(productRepository.getAllProducts())
suspend fun findOneInStock(request: ServerRequest): ServerResponse {
val id = request.pathVariable("id").toInt()
val product: Deferred<Product?> = GlobalScope.async {
productRepository.getProductById(id)
}
val quantity: Deferred<Int> = GlobalScope.async {
webClient.get()
.uri("/stock-service/product/$id/quantity")
.accept(MediaType.APPLICATION_JSON)
.retrieve().awaitBody<Int>()
}
return ServerResponse.ok()
.json()
.bodyValueAndAwait(ProductStockView(product.await()!!, quantity.await()))
}
suspend fun findOne(request: ServerRequest): ServerResponse {
val id = request.pathVariable("id").toInt()
return productRepository.getProductById(id)?.let
{ ServerResponse.ok().json().bodyValueAndAwait(it) }
?: ServerResponse.notFound().buildAndAwait()
}
}
与 Controller 相比,主要区别在于参数和返回类型。使用 DSL + 协程,可以写出非常流畅简洁的路由定义。
8. 总结
本文深入探讨了 如何将 Kotlin 协程与 Spring 生态(特别是 R2DBC 和 WebFlux)集成。
采用非阻塞性能方案能显著提升应用的性能和可扩展性。更重要的是,✅ Kotlin 协程让复杂的异步代码变得直观、易于理解和维护,减少了回调地狱和响应式链式调用的认知负担。
⚠️ 注意:文中涉及的库在中期版本迭代中可能变化较大,不同小版本间可能存在兼容性问题,请密切关注官方文档。
示例代码一如既往地托管在 GitHub 上。