1. 概述

简单来说,rxjava-jdbc 是一个用于与关系型数据库交互的 API,支持流式风格的方法调用。在这篇快速教程中,我们将了解这个库的基本用法和常见功能。

如果你想学习 RxJava 基础,可以参考这篇文章

2. Maven 依赖

首先在 pom.xml 中添加依赖:

<dependency>
    <groupId>com.github.davidmoten</groupId>
    <artifactId>rxjava-jdbc</artifactId>
    <version>0.7.11</version>
</dependency>

最新版本可在 Maven Central 查找。

3. 核心组件

Database 类是执行所有数据库操作的主要入口点。通过实现 ConnectionProvider 接口并传入 from() 静态方法创建 Database 对象:

public static ConnectionProvider connectionProvider
  = new ConnectionProviderFromUrl(
  DB_CONNECTION, DB_USER, DB_PASSWORD);
Database db = Database.from(connectionProvider);

ConnectionProvider 有多种实现值得注意:

  • ConnectionProviderFromContext
  • ConnectionProviderFromDataSource
  • ConnectionProviderFromUrl
  • ConnectionProviderPooled

基础操作主要使用以下 API:

  • select() – 用于 SQL 查询
  • update() – 用于 DDL(如创建/删除表)和 DML(增删改)

4. 快速上手

下面演示如何执行基本数据库操作:

public class BasicQueryTypesTest {
    
    Observable<Integer> create,
      insert1, 
      insert2, 
      insert3, 
      update, 
      delete = null;
    
    @Test
    public void whenCreateTableAndInsertRecords_thenCorrect() {
        create = db.update(
          "CREATE TABLE IF NOT EXISTS EMPLOYEE("
          + "id int primary key, name varchar(255))")
          .count();
        insert1 = db.update(
          "INSERT INTO EMPLOYEE(id, name) VALUES(1, 'John')")
          .dependsOn(create)
          .count();
        update = db.update(
          "UPDATE EMPLOYEE SET name = 'Alan' WHERE id = 1")
          .dependsOn(create)
          .count();
        insert2 = db.update(
          "INSERT INTO EMPLOYEE(id, name) VALUES(2, 'Sarah')")
          .dependsOn(create)
          .count();
        insert3 = db.update(
          "INSERT INTO EMPLOYEE(id, name) VALUES(3, 'Mike')")
          .dependsOn(create)
          .count();
        delete = db.update(
          "DELETE FROM EMPLOYEE WHERE id = 2")
          .dependsOn(create)
          .count();
        List<String> names = db.select(
          "select name from EMPLOYEE where id < ?")
          .parameter(3)
          .dependsOn(create)
          .dependsOn(insert1)
          .dependsOn(insert2)
          .dependsOn(insert3)
          .dependsOn(update)
          .dependsOn(delete)
          .getAs(String.class)
          .toList()
          .toBlocking()
          .single();
        
        assertEquals(Arrays.asList("Alan"), names);
    }
}

⚠️ 注意:必须通过 dependsOn() 明确声明操作顺序,否则可能产生不可预测的结果或直接报错。

5. 自动映射

自动映射功能可将数据库记录直接映射为对象,支持两种方式:

5.1. 基于接口的映射

通过注解接口实现映射。先定义接口:

public interface Employee {

    @Column("id")
    int id();

    @Column("name")
    String name();
}

使用示例:

@Test
public void whenSelectFromTableAndAutomap_thenCorrect() {
    List<Employee> employees = db.select("select id, name from EMPLOYEE")
      .dependsOn(create)
      .dependsOn(insert1)
      .dependsOn(insert2)
      .autoMap(Employee.class)
      .toList()
      .toBlocking()
      .single();
    
    assertThat(
      employees.get(0).id()).isEqualTo(1);
    assertThat(
      employees.get(0).name()).isEqualTo("Alan");
    assertThat(
      employees.get(1).id()).isEqualTo(2);
    assertThat(
      employees.get(1).name()).isEqualTo("Sarah");
}

5.2. 基于类的映射

使用具体类实现映射。定义类:

public class Manager {

    private int id;
    private String name;

    // 标准构造函数、getter/setter
}

使用示例:

@Test
public void whenSelectManagersAndAutomap_thenCorrect() {
    List<Manager> managers = db.select("select id, name from MANAGER")
      .dependsOn(create)
      .dependsOn(insert1)
      .dependsOn(insert2)
      .autoMap(Manager.class)
      .toList()
      .toBlocking()
      .single();
    
    assertThat(
      managers.get(0).getId()).isEqualTo(1);
    assertThat(
     managers.get(0).getName()).isEqualTo("Alan");
    assertThat(
      managers.get(1).getId()).isEqualTo(2);
    assertThat(
      managers.get(1).getName()).isEqualTo("Sarah");
}

关键点

  • create/insert1/insert2 是创建表和插入数据的操作引用
  • 查询字段数量必须与类构造函数参数数量一致
  • 字段类型需支持自动映射(如 intINTEGER

更多映射细节参考 rxjava-jdbc 仓库

6. 处理大对象

支持 CLOB 和 BLOB 类型操作,以下是具体用法:

6.1. CLOB 操作

插入和查询 CLOB 示例:

@Before
public void setup() throws IOException {
    create = db.update(
      "CREATE TABLE IF NOT EXISTS " + 
      "SERVERLOG (id int primary key, document CLOB)")
        .count();
    
    InputStream actualInputStream
      = new FileInputStream("src/test/resources/actual_clob");
    actualDocument = getStringFromInputStream(actualInputStream);

    InputStream expectedInputStream = new FileInputStream(
      "src/test/resources/expected_clob");
 
    expectedDocument = getStringFromInputStream(expectedInputStream);
    insert = db.update(
      "insert into SERVERLOG(id,document) values(?,?)")
        .parameter(1)
        .parameter(Database.toSentinelIfNull(actualDocument))
      .dependsOn(create)
      .count();
}

@Test
public void whenSelectCLOB_thenCorrect() throws IOException {
    db.select("select document from SERVERLOG where id = 1")
      .dependsOn(create)
      .dependsOn(insert)
      .getAs(String.class)
      .toList()
      .toBlocking()
      .single();
    
    assertEquals(expectedDocument, actualDocument);
}

getStringFromInputStream() 是将 InputStream 转为 String 的工具方法。

6.2. BLOB 操作

与 CLOB 类似,但需传入字节数组:

@Before
public void setup() throws IOException {
    create = db.update(
      "CREATE TABLE IF NOT EXISTS " 
      + "SERVERLOG (id int primary key, document BLOB)")
        .count();
    
    InputStream actualInputStream
      = new FileInputStream("src/test/resources/actual_clob");
    actualDocument = getStringFromInputStream(actualInputStream);
    byte[] bytes = this.actualDocument.getBytes(StandardCharsets.UTF_8);
    
    InputStream expectedInputStream = new FileInputStream(
      "src/test/resources/expected_clob");
    expectedDocument = getStringFromInputStream(expectedInputStream);
    insert = db.update(
      "insert into SERVERLOG(id,document) values(?,?)")
      .parameter(1)
      .parameter(Database.toSentinelIfNull(bytes))
      .dependsOn(create)
      .count();
}

测试代码可复用上例的查询逻辑。

7. 事务管理

事务支持将多个操作捆绑执行,实现统一提交或回滚:

@Test
public void whenCommitTransaction_thenRecordUpdated() {
    Observable<Boolean> begin = db.beginTransaction();
    Observable<Integer> createStatement = db.update(
      "CREATE TABLE IF NOT EXISTS EMPLOYEE(id int primary key, name varchar(255))")
      .dependsOn(begin)
      .count();
    Observable<Integer> insertStatement = db.update(
      "INSERT INTO EMPLOYEE(id, name) VALUES(1, 'John')")
      .dependsOn(createStatement)
      .count();
    Observable<Integer> updateStatement = db.update(
      "UPDATE EMPLOYEE SET name = 'Tom' WHERE id = 1")
      .dependsOn(insertStatement)
      .count();
    Observable<Boolean> commit = db.commit(updateStatement);
    String name = db.select("select name from EMPLOYEE WHERE id = 1")
      .dependsOn(commit)
      .getAs(String.class)
      .toBlocking()
      .single();
    
    assertEquals("Tom", name);
}

事务操作流程:

  1. beginTransaction() 开启事务
  2. 后续操作自动加入同一事务
  3. commit() 提交或 rollback() 回滚

踩坑提示:务必在异常处理中调用 rollback(),避免数据不一致。

8. 获取自增主键

当表使用 auto_increment 字段时,可通过 returnGeneratedKeys() 获取生成值:

@Test
public void whenInsertAndReturnGeneratedKey_thenCorrect() {
    Integer key = db.update("INSERT INTO EMPLOYEE(name) VALUES('John')")
      .dependsOn(createStatement)
      .returnGeneratedKeys()
      .getAs(Integer.class)
      .count()
      .toBlocking()
      .single();
 
    assertThat(key).isEqualTo(1);
}

9. 总结

本文介绍了 rxjava-jdbc 的流式 API 用法,包括:

  • ✅ 基础增删改查操作
  • ✅ 自动映射(接口/类两种方式)
  • ✅ 大对象(CLOB/BLOB)处理
  • ✅ 事务管理
  • ✅ 自增主键获取

完整代码示例见 GitHub 仓库


原始标题:Introduction to rxjava-jdbc