1. 概述

R2DBC(Reactive Relational Database Connectivity) 是 Pivotal 在 2018 年 Spring One Platform 大会上提出的一个项目。其目标是为 SQL 数据库提供一个响应式 API 接口,从而支持非阻塞数据库访问。

R2DBC 的设计初衷是为了解决传统 JDBC 在响应式编程模型下的不足。JDBC 是阻塞式的,无法很好地融入响应式流(Reactive Streams)体系,而 R2DBC 则基于 Reactive Streams 规范,提供了异步非阻塞的数据库访问能力。

2. 搭建第一个 Spring Data R2DBC 项目

要使用 Spring Data R2DBC,首先需要引入以下核心依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-r2dbc</artifactId>
    </dependency>
    <dependency>
        <groupId>io.r2dbc</groupId>
        <artifactId>r2dbc-h2</artifactId>
    </dependency>
</dependencies>

此外,建议同时引入 Lombok、Spring WebFlux 等工具库,以简化实体类定义和构建响应式 Web 应用。完整依赖可参考 GitHub 示例项目

3. 配置连接工厂

使用数据库的第一步是配置连接工厂(ConnectionFactory)。R2DBC 提供了 AbstractR2dbcConfiguration 类来简化配置流程。

示例代码如下:

@Configuration
@EnableR2dbcRepositories
class R2DBCConfiguration extends AbstractR2dbcConfiguration {
    @Bean
    public H2ConnectionFactory connectionFactory() {
        return new H2ConnectionFactory(
            H2ConnectionConfiguration.builder()
              .url("mem:testdb;DB_CLOSE_DELAY=-1;")
              .username("sa")
              .build()
        );
    }
}

注意:

  • @EnableR2dbcRepositories:启用 Spring Data R2DBC 的自动仓库扫描和注册。
  • H2ConnectionFactory:用于连接 H2 内存数据库,适合本地开发和测试。

4. 编写第一个 R2DBC 应用

4.1 定义 Repository

Spring Data R2DBC 提供了 ReactiveCrudRepository 接口,用于快速实现响应式 CRUD 操作:

interface PlayerRepository extends ReactiveCrudRepository<Player, Integer> {}

4.2 定义实体类

借助 Lombok 简化实体类定义:

@Data
@NoArgsConstructor
@AllArgsConstructor
class Player {
    @Id
    Integer id;
    String name;
    Integer age;
}
  • @Id 注解用于标识主键字段。
  • 使用 Lombok 的 @Data 自动生成 getter/setter、toString、equals 等方法。

5. 测试代码

使用 StepVerifier 编写响应式测试用例,验证数据操作是否符合预期:

@Test
public void whenDeleteAll_then0IsExpected() {
    playerRepository.deleteAll()
      .as(StepVerifier::create)
      .expectNextCount(0)
      .verifyComplete();
}

@Test
public void whenInsert6_then6AreExpected() {
    insertPlayers();
    playerRepository.findAll()
      .as(StepVerifier::create)
      .expectNextCount(6)
      .verifyComplete();
}

✅ 提示:StepVerifier 是 Project Reactor 提供的测试工具,用于验证响应式流的行为。

6. 自定义查询

除了基础的 CRUD 操作,R2DBC 还支持通过 @Query 注解定义自定义 SQL 查询:

@Query("select id, name, age from player where name = $1")
Flux<Player> findAllByName(String name);

@Query("select * from player where age = $1")
Flux<Player> findByAge(int age);

对应的测试用例如下:

@Test
public void whenSearchForCR7_then1IsExpected() {
    insertPlayers();
    playerRepository.findAllByName("CR7")
      .as(StepVerifier::create)
      .expectNextCount(1)
      .verifyComplete();
}

@Test
public void whenSearchFor32YearsOld_then2AreExpected() {
    insertPlayers();
    playerRepository.findByAge(32)
      .as(StepVerifier::create)
      .expectNextCount(2)
      .verifyComplete();
}

private void insertPlayers() {
    List<Player> players = Arrays.asList(
        new Player(1, "Kaka", 37),
        new Player(2, "Messi", 32),
        new Player(3, "Mbappé", 20),
        new Player(4, "CR7", 34),
        new Player(5, "Lewandowski", 30),
        new Player(6, "Cavani", 32)
    );
    playerRepository.saveAll(players).subscribe();
}

⚠️ 注意:SQL 中参数使用 $1$2 表示,而不是 ?

7. 使用 R2dbcEntityTemplate

Spring Boot 2.4 引入了 R2dbcEntityTemplate,它提供了一套 Fluent API,用于执行常见的数据库操作,适合非 Repository 场景使用。

7.1 初始化模板

ConnectionFactory connectionFactory = ConnectionFactories.get(
  "r2dbc:h2:mem:///testdb?options=DB_CLOSE_DELAY=-1;TRACE_LEVEL_FILE=4;USER=sa;PASSWORD="
);

R2dbcEntityTemplate template = new R2dbcEntityTemplate(connectionFactory);

7.2 插入数据

void insertPlayers() {
    List<Player> players = Arrays.asList(
      new Player(null, "Saka", 22), 
      new Player(null, "Pedro", 32), 
      new Player(null, "Mbappé", 20)
    );

    for (Player player : players) {
        template.insert(Player.class)
          .using(player)
          .as(StepVerifier::create)
          .expectNextCount(1)
          .verifyComplete();
    }
}

7.3 查询数据

@Test
void whenInsertThreePlayers_thenThreeAreExpected() {
    insertPlayers();

    template.select(Player.class)
      .all()
      .as(StepVerifier::create)
      .expectNextCount(3)
      .verifyComplete();
}

7.4 自定义查询

@Test
void whenSearchForSaka_thenOneIsExpected() {
    insertPlayers();

    template.select(Player.class)
      .matching(query(where("name").is("Saka")))
      .one()
      .as(StepVerifier::create)
      .expectNextCount(1)
      .verifyComplete();
}

R2dbcEntityTemplate 支持 insert、select、update、delete 等多种操作,非常适合在 Service 层灵活使用。

8. 批处理操作

R2DBC 支持通过 Batch 实现多条 SQL 语句的批量执行,提升性能。

8.1 创建并执行 Batch

@Test
public void whenBatchHas2Operations_then2AreExpected() {
    Mono.from(factory.create())
      .flatMapMany(connection -> Flux.from(connection
        .createBatch()
        .add("select * from player")
        .add("select * from player")
        .execute()))
      .as(StepVerifier::create)
      .expectNextCount(2)
      .verifyComplete();
}
  • Batch 支持添加多个 SQL 语句。
  • execute() 返回一个 Publisher,每个语句的执行结果都会作为一项发出。

9. 总结

R2DBC 是 Spring 响应式编程生态中的重要组成部分,它填补了传统 JDBC 在响应式编程模型中的空白。结合 Spring WebFlux,我们可以构建从上到下全链路异步的响应式应用。

优点总结:

  • 支持异步非阻塞数据库访问
  • 与 Spring Data 整合良好
  • 支持多种数据库驱动(如 PostgreSQL、MySQL、H2)

注意点:

  • 不支持所有 SQL 数据库(需有 R2DBC 驱动)
  • 事务支持有限,部分数据库需手动管理

完整的示例代码可参考 GitHub:GitHub 示例地址


原始标题:A Quick Look at R2DBC

« 上一篇: Java周报,290
» 下一篇: Java 中的代理模式