1. 概述
R2DBC(Reactive Relational Database Connectivity) 是 Pivotal 在 2018 年 Spring One Platform 大会上提出的一个项目。其目标是为 SQL 数据库提供一个响应式 API 接口,从而支持非阻塞数据库访问。
R2DBC 的设计初衷是为了解决传统 JDBC 在响应式编程模型下的不足。JDBC 是阻塞式的,无法很好地融入响应式流(Reactive Streams)体系,而 R2DBC 则基于 Reactive Streams 规范,提供了异步非阻塞的数据库访问能力。
2. 搭建第一个 Spring Data R2DBC 项目
要使用 Spring Data R2DBC,首先需要引入以下核心依赖:
<dependencies>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-h2</artifactId>
</dependency>
</dependencies>
此外,建议同时引入 Lombok、Spring WebFlux 等工具库,以简化实体类定义和构建响应式 Web 应用。完整依赖可参考 GitHub 示例项目。
3. 配置连接工厂
使用数据库的第一步是配置连接工厂(ConnectionFactory)。R2DBC 提供了 AbstractR2dbcConfiguration
类来简化配置流程。
示例代码如下:
@Configuration
@EnableR2dbcRepositories
class R2DBCConfiguration extends AbstractR2dbcConfiguration {
@Bean
public H2ConnectionFactory connectionFactory() {
return new H2ConnectionFactory(
H2ConnectionConfiguration.builder()
.url("mem:testdb;DB_CLOSE_DELAY=-1;")
.username("sa")
.build()
);
}
}
注意:
@EnableR2dbcRepositories
:启用 Spring Data R2DBC 的自动仓库扫描和注册。H2ConnectionFactory
:用于连接 H2 内存数据库,适合本地开发和测试。
4. 编写第一个 R2DBC 应用
4.1 定义 Repository
Spring Data R2DBC 提供了 ReactiveCrudRepository
接口,用于快速实现响应式 CRUD 操作:
interface PlayerRepository extends ReactiveCrudRepository<Player, Integer> {}
4.2 定义实体类
借助 Lombok 简化实体类定义:
@Data
@NoArgsConstructor
@AllArgsConstructor
class Player {
@Id
Integer id;
String name;
Integer age;
}
@Id
注解用于标识主键字段。- 使用 Lombok 的
@Data
自动生成 getter/setter、toString、equals 等方法。
5. 测试代码
使用 StepVerifier 编写响应式测试用例,验证数据操作是否符合预期:
@Test
public void whenDeleteAll_then0IsExpected() {
playerRepository.deleteAll()
.as(StepVerifier::create)
.expectNextCount(0)
.verifyComplete();
}
@Test
public void whenInsert6_then6AreExpected() {
insertPlayers();
playerRepository.findAll()
.as(StepVerifier::create)
.expectNextCount(6)
.verifyComplete();
}
✅ 提示:StepVerifier
是 Project Reactor 提供的测试工具,用于验证响应式流的行为。
6. 自定义查询
除了基础的 CRUD 操作,R2DBC 还支持通过 @Query
注解定义自定义 SQL 查询:
@Query("select id, name, age from player where name = $1")
Flux<Player> findAllByName(String name);
@Query("select * from player where age = $1")
Flux<Player> findByAge(int age);
对应的测试用例如下:
@Test
public void whenSearchForCR7_then1IsExpected() {
insertPlayers();
playerRepository.findAllByName("CR7")
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();
}
@Test
public void whenSearchFor32YearsOld_then2AreExpected() {
insertPlayers();
playerRepository.findByAge(32)
.as(StepVerifier::create)
.expectNextCount(2)
.verifyComplete();
}
private void insertPlayers() {
List<Player> players = Arrays.asList(
new Player(1, "Kaka", 37),
new Player(2, "Messi", 32),
new Player(3, "Mbappé", 20),
new Player(4, "CR7", 34),
new Player(5, "Lewandowski", 30),
new Player(6, "Cavani", 32)
);
playerRepository.saveAll(players).subscribe();
}
⚠️ 注意:SQL 中参数使用 $1
、$2
表示,而不是 ?
。
7. 使用 R2dbcEntityTemplate
Spring Boot 2.4 引入了 R2dbcEntityTemplate
,它提供了一套 Fluent API,用于执行常见的数据库操作,适合非 Repository 场景使用。
7.1 初始化模板
ConnectionFactory connectionFactory = ConnectionFactories.get(
"r2dbc:h2:mem:///testdb?options=DB_CLOSE_DELAY=-1;TRACE_LEVEL_FILE=4;USER=sa;PASSWORD="
);
R2dbcEntityTemplate template = new R2dbcEntityTemplate(connectionFactory);
7.2 插入数据
void insertPlayers() {
List<Player> players = Arrays.asList(
new Player(null, "Saka", 22),
new Player(null, "Pedro", 32),
new Player(null, "Mbappé", 20)
);
for (Player player : players) {
template.insert(Player.class)
.using(player)
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();
}
}
7.3 查询数据
@Test
void whenInsertThreePlayers_thenThreeAreExpected() {
insertPlayers();
template.select(Player.class)
.all()
.as(StepVerifier::create)
.expectNextCount(3)
.verifyComplete();
}
7.4 自定义查询
@Test
void whenSearchForSaka_thenOneIsExpected() {
insertPlayers();
template.select(Player.class)
.matching(query(where("name").is("Saka")))
.one()
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();
}
R2dbcEntityTemplate
支持 insert、select、update、delete 等多种操作,非常适合在 Service 层灵活使用。
8. 批处理操作
R2DBC 支持通过 Batch
实现多条 SQL 语句的批量执行,提升性能。
8.1 创建并执行 Batch
@Test
public void whenBatchHas2Operations_then2AreExpected() {
Mono.from(factory.create())
.flatMapMany(connection -> Flux.from(connection
.createBatch()
.add("select * from player")
.add("select * from player")
.execute()))
.as(StepVerifier::create)
.expectNextCount(2)
.verifyComplete();
}
Batch
支持添加多个 SQL 语句。execute()
返回一个Publisher
,每个语句的执行结果都会作为一项发出。
9. 总结
R2DBC 是 Spring 响应式编程生态中的重要组成部分,它填补了传统 JDBC 在响应式编程模型中的空白。结合 Spring WebFlux,我们可以构建从上到下全链路异步的响应式应用。
✅ 优点总结:
- 支持异步非阻塞数据库访问
- 与 Spring Data 整合良好
- 支持多种数据库驱动(如 PostgreSQL、MySQL、H2)
❌ 注意点:
- 不支持所有 SQL 数据库(需有 R2DBC 驱动)
- 事务支持有限,部分数据库需手动管理
完整的示例代码可参考 GitHub:GitHub 示例地址。