1. 概述
在本教程中,我们将介绍如何使用 R2DBC(Reactive Relational Database Connectivity)以响应式的方式执行数据库操作。
为了深入理解 R2DBC,我们会构建一个简单的 Spring WebFlux REST 应用,实现一个实体的 CRUD 操作,并全程使用异步非阻塞方式完成数据库访问,避免阻塞主线程。
2. 什么是 R2DBC?
响应式开发正变得越来越流行,新的框架不断涌现,已有框架的使用率也在上升。但 Java/JVM 生态中一个长期存在的问题是:数据库访问仍然基本是同步的。这源于 JDBC 的设计方式,也导致了在响应式编程中不得不使用各种“打补丁”的方式来兼容两者。
为了解决 Java 中异步数据库访问的需求,出现了两个标准:
- ADBA(Asynchronous Database Access API):由 Oracle 推动,但目前进展缓慢,缺乏明确的发布时间表。
- ✅ R2DBC(Reactive Relational Database Connectivity):由 Pivotal 及其他公司主导的社区项目,目前仍处于 beta 阶段,但已展现出较强的活力,并提供了对 Postgres、H2 和 MSSQL 的驱动支持。
本教程将重点介绍 R2DBC 的使用。
3. 项目搭建
使用 R2DBC 需要引入核心 API 和对应的数据库驱动。我们以 H2 数据库为例,只需添加如下依赖:
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-h2</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
⚠️
r2dbc-spi
会作为r2dbc-h2
的传递依赖自动引入,无需手动添加。
4. ConnectionFactory 配置
使用 R2DBC 的第一步是创建 ConnectionFactory
,它类似于 JDBC 中的 DataSource
。创建方式通常通过 ConnectionFactories
工具类完成。
下面是一个创建 ConnectionFactory
Bean 的示例:
@Bean
public ConnectionFactory connectionFactory(R2DBCConfigurationProperties properties) {
ConnectionFactoryOptions baseOptions = ConnectionFactoryOptions.parse(properties.getUrl());
Builder ob = ConnectionFactoryOptions.builder().from(baseOptions);
if (!StringUtil.isNullOrEmpty(properties.getUser())) {
ob = ob.option(USER, properties.getUser());
}
if (!StringUtil.isNullOrEmpty(properties.getPassword())) {
ob = ob.option(PASSWORD, properties.getPassword());
}
return ConnectionFactories.get(ob.build());
}
这段代码从配置类中获取连接 URL、用户名和密码,构建出一个 ConnectionFactory
Bean。
R2DBC 连接字符串示例
r2dbc:h2:mem://./testdb
解析如下:
r2dbc
:固定协议标识符(也可以是r2dbc+ssl
)h2
:驱动标识符mem
:驱动特定协议,表示内存数据库//./testdb
:数据库名或主机信息
5. 执行 SQL 语句
与 JDBC 类似,R2DBC 的核心是发送 SQL 语句并处理结果集。但由于它是响应式 API,因此大量使用了响应式流类型,如 Publisher
和 Subscriber
。
为简化开发,我们通常使用 Reactor 提供的 Mono
和 Flux
来处理响应式流。
5.1 获取数据库连接
首先,我们需要从 ConnectionFactory
获取一个 Connection
实例。由于是响应式 API,create()
方法返回的是一个 Publisher<Connection>
,我们将其包装为 Mono
:
public Mono<Account> findById(Long id) {
return Mono.from(connectionFactory.create())
.flatMap(c ->
// 使用连接
);
}
5.2 准备并执行语句
拿到连接后,我们就可以创建 SQL 语句并执行:
.flatMap(c ->
Mono.from(c.createStatement("select id,iban,balance from Account where id = $1")
.bind("$1", id)
.execute())
.doFinally((st) -> close(c))
)
注意:
createStatement()
是同步操作,可以链式调用。- 占位符语法是驱动特定的:
- H2 使用
$1
,$2
- PostgreSQL 使用
$1
,$2
- MySQL 使用
?
- Oracle 使用
:1
,:2
- H2 使用
⚠️ 这一点在迁移旧代码时要特别注意!
5.3 处理查询结果
接下来我们需要处理返回的 Result
对象,并将其映射为业务对象:
.map(result -> result.map((row, meta) ->
new Account(row.get("id", Long.class),
row.get("iban", String.class),
row.get("balance", BigDecimal.class))))
.flatMap(p -> Mono.from(p));
result.map()
接收两个参数:Row
和RowMetadata
- 我们从
Row
中提取字段值,构造Account
对象 - 由于返回的是
Mono<Publisher<Account>>
,我们再用flatMap
转换为Mono<Account>
5.4 批量执行语句
R2DBC 也支持批量执行 SQL 语句,适用于 ETL 等场景:
@Bean
public CommandLineRunner initDatabase(ConnectionFactory cf) {
return (args) ->
Flux.from(cf.create())
.flatMap(c ->
Flux.from(c.createBatch()
.add("drop table if exists Account")
.add("create table Account(" +
"id IDENTITY(1,1)," +
"iban varchar(80) not null," +
"balance DECIMAL(18,2) not null)")
.add("insert into Account(iban,balance)" +
"values('BR430120980198201982',100.00)")
.add("insert into Account(iban,balance)" +
"values('BR430120998729871000',250.00)")
.execute())
.doFinally((st) -> c.close())
)
.log()
.blockLast();
}
- 使用
createBatch()
创建批量语句 add()
添加 SQL 语句execute()
提交执行
⚠️ 批量语句不支持参数绑定,适合执行固定语句。
6. 事务管理
R2DBC 支持事务管理,与 JDBC 类似,通过 Connection
对象控制。
我们来看一个创建账户并提交事务的示例:
public Mono<Account> createAccount(Account account) {
return Mono.from(connectionFactory.create())
.flatMap(c -> Mono.from(c.beginTransaction())
.then(Mono.from(c.createStatement("insert into Account(iban,balance) values($1,$2)")
.bind("$1", account.getIban())
.bind("$2", account.getBalance())
.returnGeneratedValues("id")
.execute()))
.map(result -> result.map((row, meta) ->
new Account(row.get("id", Long.class),
account.getIban(),
account.getBalance())))
.flatMap(pub -> Mono.from(pub))
.delayUntil(r -> c.commitTransaction())
.doFinally((st) -> c.close()));
}
关键点:
- 使用
beginTransaction()
开启事务 - 插入数据时使用
returnGeneratedValues("id")
获取自动生成的主键 - 使用
commitTransaction()
提交事务 doFinally()
确保连接被关闭
7. 示例 DAO 的使用
有了响应式 DAO,我们就可以轻松构建一个 Spring WebFlux 应用。例如,一个获取账户信息的接口如下:
@RestController
public class AccountResource {
private final ReactiveAccountDao accountDao;
public AccountResource(ReactiveAccountDao accountDao) {
this.accountDao = accountDao;
}
@GetMapping("/accounts/{id}")
public Mono<ResponseEntity<Account>> getAccount(@PathVariable("id") Long id) {
return accountDao.findById(id)
.map(acc -> new ResponseEntity<>(acc, HttpStatus.OK))
.switchIfEmpty(Mono.just(new ResponseEntity<>(null, HttpStatus.NOT_FOUND)));
}
}
- 使用
Mono
构建响应式返回值 switchIfEmpty()
处理未找到记录的情况,返回 404
8. 总结
本教程介绍了如何使用 R2DBC 实现响应式数据库操作。虽然 R2DBC 仍处于早期阶段,但它的社区活跃度和驱动支持已经展现出不错的前景。
相比 ADBA,R2DBC 更具实用性,目前已支持 Postgres、H2、MSSQL 等数据库(但 Oracle 尚未支持)。
完整代码示例可参考 GitHub 仓库。
✅ 适合场景:需要构建响应式、非阻塞、高并发的数据库访问层
❌ 不适合场景:传统同步项目或对 Oracle 支持有强依赖的项目
如果你正在构建微服务或响应式系统,R2DBC 是一个值得尝试的技术方向。