1. 概述
遍历ResultSet
是JDBC查询中获取数据的常规操作。但在某些场景下,我们更希望以流式方式处理记录。本文将探讨几种使用Stream API处理ResultSet
的方案,特别适合处理无法一次性加载到内存的大数据集。
2. 使用JOOQ实现流式处理
JOOQ提供了便捷的流式处理能力。下面我们通过fetchStream()
方法将ResultSet
转换为流:
public Stream<CityRecord> getCitiesStreamUsingJOOQ(Connection connection)
throws SQLException {
PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
connection.setAutoCommit(false);
preparedStatement.setFetchSize(5000);
ResultSet resultSet = preparedStatement.executeQuery();
return DSL.using(connection)
.fetchStream(resultSet)
.map(r -> new CityRecord(r.get("NAME", String.class),
r.get("COUNTRY", String.class)));
}
✅ 关键点:
- 使用
fetchStream()
直接构建记录流 - 通过
map()
将JOOQ记录转换为业务对象 - 设置
fetchSize
控制内存使用
测试验证:
@Test
void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingJOOQ_thenExpectedRecordsShouldBeReturned() throws SQLException {
Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
.getCitiesStreamUsingJOOQ(connection);
List<CityRecord> cities = cityRecords.toList();
assertThat(cities)
.containsExactly(
new CityRecord("London", "United Kingdom"),
new CityRecord("Sydney", "Australia"),
new CityRecord("Bucharest", "Romania"));
}
3. 使用jdbc-stream库实现
对于不想引入JOOQ的项目,可以使用轻量级的jdbc-stream库。首先添加依赖:
<dependency>
<groupId>com.github.juliomarcopineda</groupId>
<artifactId>jdbc-stream</artifactId>
<version>0.1.1</version>
</dependency>
实现代码:
public Stream<CityRecord> getCitiesStreamUsingJdbcStream(Connection connection)
throws SQLException {
PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
connection.setAutoCommit(false);
preparedStatement.setFetchSize(5000);
ResultSet resultSet = preparedStatement.executeQuery();
return JdbcStream.stream(resultSet)
.map(r -> {
try {
return createCityRecord(resultSet);
} catch (SQLException e) {
throw new RuntimeException(e);
}
});
}
⚠️ 注意事项:
- 底层使用Spliterator实现流式处理
- 需要手动处理SQLException(这里包装为RuntimeException)
- 与自定义实现逻辑类似但更简洁
测试验证:
@Test
void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingJdbcStream_thenExpectedRecordsShouldBeReturned() throws SQLException {
Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
.getCitiesStreamUsingJdbcStream(connection);
List<CityRecord> cities = cityRecords.toList();
assertThat(cities)
.containsExactly(
new CityRecord("London", "United Kingdom"),
new CityRecord("Sydney", "Australia"),
new CityRecord("Bucharest", "Romania"));
}
4. 方案对比与总结
方案 | 优点 | 缺点 |
---|---|---|
JOOK | 功能强大,API丰富 | 依赖较重 |
jdbc-stream | 轻量级,专注流式处理 | 需要手动处理异常 |
✅ 适用场景:
- 处理GB级大数据集(避免OOM)
- 函数式编程风格的项目
- 需要链式操作的场景
❌ 不适用场景:
- 简单小数据量(直接List更高效)
- 需要复杂事务控制的场景
核心建议:流式处理是处理大数据集的利器,但要注意:
- 始终设置合理的
fetchSize
- 确保连接在流关闭后正确释放
- 异常处理要彻底(特别是SQLException)
这种方案特别适合需要流式处理和函数式编程风格的项目,能优雅解决大数据集处理问题。