1. 概述

在本教程中,我们将介绍如何使用 R2DBC(Reactive Relational Database Connectivity)以响应式的方式执行数据库操作。

为了深入理解 R2DBC,我们会构建一个简单的 Spring WebFlux REST 应用,实现一个实体的 CRUD 操作,并全程使用异步非阻塞方式完成数据库访问,避免阻塞主线程。


2. 什么是 R2DBC?

响应式开发正变得越来越流行,新的框架不断涌现,已有框架的使用率也在上升。但 Java/JVM 生态中一个长期存在的问题是:数据库访问仍然基本是同步的。这源于 JDBC 的设计方式,也导致了在响应式编程中不得不使用各种“打补丁”的方式来兼容两者。

为了解决 Java 中异步数据库访问的需求,出现了两个标准:

  • ADBA(Asynchronous Database Access API):由 Oracle 推动,但目前进展缓慢,缺乏明确的发布时间表。
  • R2DBC(Reactive Relational Database Connectivity):由 Pivotal 及其他公司主导的社区项目,目前仍处于 beta 阶段,但已展现出较强的活力,并提供了对 Postgres、H2 和 MSSQL 的驱动支持。

本教程将重点介绍 R2DBC 的使用。


3. 项目搭建

使用 R2DBC 需要引入核心 API 和对应的数据库驱动。我们以 H2 数据库为例,只需添加如下依赖:

<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-h2</artifactId>
    <version>1.0.0.RELEASE</version>
</dependency>

⚠️ r2dbc-spi 会作为 r2dbc-h2 的传递依赖自动引入,无需手动添加。


4. ConnectionFactory 配置

使用 R2DBC 的第一步是创建 ConnectionFactory,它类似于 JDBC 中的 DataSource。创建方式通常通过 ConnectionFactories 工具类完成。

下面是一个创建 ConnectionFactory Bean 的示例:

@Bean
public ConnectionFactory connectionFactory(R2DBCConfigurationProperties properties) {
    ConnectionFactoryOptions baseOptions = ConnectionFactoryOptions.parse(properties.getUrl());
    Builder ob = ConnectionFactoryOptions.builder().from(baseOptions);
    if (!StringUtil.isNullOrEmpty(properties.getUser())) {
        ob = ob.option(USER, properties.getUser());
    }
    if (!StringUtil.isNullOrEmpty(properties.getPassword())) {
        ob = ob.option(PASSWORD, properties.getPassword());
    }        
    return ConnectionFactories.get(ob.build());    
}

这段代码从配置类中获取连接 URL、用户名和密码,构建出一个 ConnectionFactory Bean。

R2DBC 连接字符串示例

r2dbc:h2:mem://./testdb

解析如下:

  • r2dbc:固定协议标识符(也可以是 r2dbc+ssl
  • h2:驱动标识符
  • mem:驱动特定协议,表示内存数据库
  • //./testdb:数据库名或主机信息

5. 执行 SQL 语句

与 JDBC 类似,R2DBC 的核心是发送 SQL 语句并处理结果集。但由于它是响应式 API,因此大量使用了响应式流类型,如 PublisherSubscriber

为简化开发,我们通常使用 Reactor 提供的 MonoFlux 来处理响应式流。

5.1 获取数据库连接

首先,我们需要从 ConnectionFactory 获取一个 Connection 实例。由于是响应式 API,create() 方法返回的是一个 Publisher<Connection>,我们将其包装为 Mono

public Mono<Account> findById(Long id) {         
    return Mono.from(connectionFactory.create())
      .flatMap(c ->
          // 使用连接
      );
}

5.2 准备并执行语句

拿到连接后,我们就可以创建 SQL 语句并执行:

.flatMap(c -> 
    Mono.from(c.createStatement("select id,iban,balance from Account where id = $1")
      .bind("$1", id)
      .execute())
      .doFinally((st) -> close(c))
)

注意:

  • createStatement() 是同步操作,可以链式调用。
  • 占位符语法是驱动特定的:
    • H2 使用 $1, $2
    • PostgreSQL 使用 $1, $2
    • MySQL 使用 ?
    • Oracle 使用 :1, :2

⚠️ 这一点在迁移旧代码时要特别注意!

5.3 处理查询结果

接下来我们需要处理返回的 Result 对象,并将其映射为业务对象:

.map(result -> result.map((row, meta) -> 
    new Account(row.get("id", Long.class),
      row.get("iban", String.class),
      row.get("balance", BigDecimal.class))))
.flatMap(p -> Mono.from(p));
  • result.map() 接收两个参数:RowRowMetadata
  • 我们从 Row 中提取字段值,构造 Account 对象
  • 由于返回的是 Mono<Publisher<Account>>,我们再用 flatMap 转换为 Mono<Account>

5.4 批量执行语句

R2DBC 也支持批量执行 SQL 语句,适用于 ETL 等场景:

@Bean
public CommandLineRunner initDatabase(ConnectionFactory cf) {
    return (args) ->
      Flux.from(cf.create())
        .flatMap(c -> 
            Flux.from(c.createBatch()
              .add("drop table if exists Account")
              .add("create table Account(" +
                "id IDENTITY(1,1)," +
                "iban varchar(80) not null," +
                "balance DECIMAL(18,2) not null)")
              .add("insert into Account(iban,balance)" +
                "values('BR430120980198201982',100.00)")
              .add("insert into Account(iban,balance)" +
                "values('BR430120998729871000',250.00)")
              .execute())
            .doFinally((st) -> c.close())
          )
        .log()
        .blockLast();
}
  • 使用 createBatch() 创建批量语句
  • add() 添加 SQL 语句
  • execute() 提交执行

⚠️ 批量语句不支持参数绑定,适合执行固定语句。


6. 事务管理

R2DBC 支持事务管理,与 JDBC 类似,通过 Connection 对象控制。

我们来看一个创建账户并提交事务的示例:

public Mono<Account> createAccount(Account account) {
    return Mono.from(connectionFactory.create())
      .flatMap(c -> Mono.from(c.beginTransaction())
        .then(Mono.from(c.createStatement("insert into Account(iban,balance) values($1,$2)")
          .bind("$1", account.getIban())
          .bind("$2", account.getBalance())
          .returnGeneratedValues("id")
          .execute()))
        .map(result -> result.map((row, meta) -> 
            new Account(row.get("id", Long.class),
              account.getIban(),
              account.getBalance())))
        .flatMap(pub -> Mono.from(pub))
        .delayUntil(r -> c.commitTransaction())
        .doFinally((st) -> c.close()));   
}

关键点:

  • 使用 beginTransaction() 开启事务
  • 插入数据时使用 returnGeneratedValues("id") 获取自动生成的主键
  • 使用 commitTransaction() 提交事务
  • doFinally() 确保连接被关闭

7. 示例 DAO 的使用

有了响应式 DAO,我们就可以轻松构建一个 Spring WebFlux 应用。例如,一个获取账户信息的接口如下:

@RestController
public class AccountResource {
    private final ReactiveAccountDao accountDao;

    public AccountResource(ReactiveAccountDao accountDao) {
        this.accountDao = accountDao;
    }

    @GetMapping("/accounts/{id}")
    public Mono<ResponseEntity<Account>> getAccount(@PathVariable("id") Long id) {
        return accountDao.findById(id)
          .map(acc -> new ResponseEntity<>(acc, HttpStatus.OK))
          .switchIfEmpty(Mono.just(new ResponseEntity<>(null, HttpStatus.NOT_FOUND)));
    }
}
  • 使用 Mono 构建响应式返回值
  • switchIfEmpty() 处理未找到记录的情况,返回 404

8. 总结

本教程介绍了如何使用 R2DBC 实现响应式数据库操作。虽然 R2DBC 仍处于早期阶段,但它的社区活跃度和驱动支持已经展现出不错的前景。

相比 ADBA,R2DBC 更具实用性,目前已支持 Postgres、H2、MSSQL 等数据库(但 Oracle 尚未支持)。

完整代码示例可参考 GitHub 仓库

适合场景:需要构建响应式、非阻塞、高并发的数据库访问层
不适合场景:传统同步项目或对 Oracle 支持有强依赖的项目

如果你正在构建微服务或响应式系统,R2DBC 是一个值得尝试的技术方向。


原始标题:R2DBC – Reactive Relational Database Connectivity | Baeldung