1. 概述

遍历ResultSet是JDBC查询中获取数据的常规操作。但在某些场景下,我们更希望以流式方式处理记录。本文将探讨几种使用Stream API处理ResultSet的方案,特别适合处理无法一次性加载到内存的大数据集。

2. 使用JOOQ实现流式处理

JOOQ提供了便捷的流式处理能力。下面我们通过fetchStream()方法将ResultSet转换为流:

public Stream<CityRecord> getCitiesStreamUsingJOOQ(Connection connection)
        throws SQLException {

    PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
    connection.setAutoCommit(false);
    preparedStatement.setFetchSize(5000);
    ResultSet resultSet = preparedStatement.executeQuery();

    return DSL.using(connection)
      .fetchStream(resultSet)
      .map(r -> new CityRecord(r.get("NAME", String.class),
        r.get("COUNTRY", String.class)));
}

✅ 关键点:

  • 使用fetchStream()直接构建记录流
  • 通过map()将JOOQ记录转换为业务对象
  • 设置fetchSize控制内存使用

测试验证:

@Test
void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingJOOQ_thenExpectedRecordsShouldBeReturned() throws SQLException {

    Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
      .getCitiesStreamUsingJOOQ(connection);
    List<CityRecord> cities = cityRecords.toList();

    assertThat(cities)
      .containsExactly(
        new CityRecord("London", "United Kingdom"),
        new CityRecord("Sydney", "Australia"),
        new CityRecord("Bucharest", "Romania"));
}

3. 使用jdbc-stream库实现

对于不想引入JOOQ的项目,可以使用轻量级的jdbc-stream库。首先添加依赖:

<dependency>
    <groupId>com.github.juliomarcopineda</groupId>
    <artifactId>jdbc-stream</artifactId>
    <version>0.1.1</version>
</dependency>

实现代码:

public Stream<CityRecord> getCitiesStreamUsingJdbcStream(Connection connection)
        throws SQLException {

    PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
    connection.setAutoCommit(false);
    preparedStatement.setFetchSize(5000);
    ResultSet resultSet = preparedStatement.executeQuery();

    return JdbcStream.stream(resultSet)
      .map(r -> {
          try {
              return createCityRecord(resultSet);
          } catch (SQLException e) {
              throw new RuntimeException(e);
          }
      });
}

⚠️ 注意事项:

  • 底层使用Spliterator实现流式处理
  • 需要手动处理SQLException(这里包装为RuntimeException)
  • 与自定义实现逻辑类似但更简洁

测试验证:

@Test
void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingJdbcStream_thenExpectedRecordsShouldBeReturned() throws SQLException {

    Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
            .getCitiesStreamUsingJdbcStream(connection);
    List<CityRecord> cities = cityRecords.toList();

    assertThat(cities)
      .containsExactly(
        new CityRecord("London", "United Kingdom"),
        new CityRecord("Sydney", "Australia"),
        new CityRecord("Bucharest", "Romania"));
}

4. 方案对比与总结

方案 优点 缺点
JOOK 功能强大,API丰富 依赖较重
jdbc-stream 轻量级,专注流式处理 需要手动处理异常

✅ 适用场景:

  • 处理GB级大数据集(避免OOM)
  • 函数式编程风格的项目
  • 需要链式操作的场景

❌ 不适用场景:

  • 简单小数据量(直接List更高效)
  • 需要复杂事务控制的场景

核心建议:流式处理是处理大数据集的利器,但要注意:

  1. 始终设置合理的fetchSize
  2. 确保连接在流关闭后正确释放
  3. 异常处理要彻底(特别是SQLException)

这种方案特别适合需要流式处理和函数式编程风格的项目,能优雅解决大数据集处理问题。


原始标题:Processing JDBC ResultSet With Stream API | Baeldung