1. 介绍
在 Spring Batch 中,CompositeItemReader 是一个将多个 ItemReader 实例组合成单一读取器的工具。当我们需要从多个数据源或按特定顺序读取数据时,这个组件特别有用。 比如,我们可能需要同时从数据库和文件读取记录,或者按特定顺序处理两个不同表的数据。
CompositeItemReader 简化了批处理作业中多读取器的管理,确保数据处理的效率和灵活性。本教程将带你实现一个 CompositeItemReader,并通过示例和测试用例验证其行为。
2. 理解 CompositeItemReader
CompositeItemReader 的工作原理是将读取过程委托给一组 ItemReader 实例。它会按定义顺序依次从每个读取器读取数据,确保数据被顺序处理。
这在以下场景中特别实用:
- 从多个数据库或表读取数据
- 合并文件和数据库的数据
- 按特定顺序处理不同来源的数据
⚠️ 注意:CompositeItemReader 位于 org.springframework.batch.item.support 包下,自 Spring Batch 5.2.0 版本引入。
3. 实现 CompositeItemReader
我们通过一个示例来演示:从两个不同来源(平面文件和数据库)读取产品数据。目标是将两个来源的产品数据合并为单一处理流,确保所有可用记录被一起处理。
3.1. 创建 Product 类
首先需要定义 Product 类来表示数据结构。这个类封装了产品详情(ID、名称、库存、价格),作为 CSV 文件和数据库读取的统一数据模型:
public class Product {
private Long productId;
private String productName;
private Integer stock;
private BigDecimal price;
public Product(Long productId, String productName, Integer stock, BigDecimal price) {
this.productId = productId;
this.productName = productName;
this.stock = stock;
this.price = price;
}
// Getters and Setters
}
这个 Product 类将作为批处理作业中每条记录的数据载体。接下来我们为 CSV 文件和数据库分别创建 ItemReader。
3.2. 产品数据的平面文件读取器
使用 FlatFileItemReader 从 CSV 文件读取数据。配置如下:
@Bean
public FlatFileItemReader<Product> fileReader() {
return new FlatFileItemReaderBuilder<Product>()
.name("fileReader")
.resource(new ClassPathResource("products.csv"))
.delimited()
.names("productId", "productName", "stock", "price")
.linesToSkip(1)
.targetType(Product.class)
.build();
}
关键配置说明:
-
delimited()
:指定字段分隔符(默认逗号) -
names()
:定义 CSV 列名与 Product 类属性的映射 -
targetType(Product.class)
:将字段映射到类属性
3.3. 产品数据的数据库读取器
使用 JdbcCursorItemReader 从数据库的 products 表读取数据。该读取器通过 SQL 查询获取产品详情并映射到 Product 类:
@Bean
public JdbcCursorItemReader<Product> dbReader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<Product>()
.name("dbReader")
.dataSource(dataSource())
.sql("SELECT productid, productname, stock, price FROM products")
.rowMapper((rs, rowNum) -> new Product(
rs.getLong("productid"),
rs.getString("productname"),
rs.getInt("stock"),
rs.getBigDecimal("price")))
.build();
}
✅ 优势:JdbcCursorItemReader 使用游标逐行读取数据库记录,非常适合批处理场景。rowMapper()
函数负责将结果集列映射到 Product 类属性。
4. 使用 CompositeItemReader 组合读取器
现在将两个读取器通过 CompositeItemReader 整合:
@Bean
public CompositeItemReader<Product> compositeReader() {
return new CompositeItemReader<>(Arrays.asList(fileReader(), dbReader()));
}
工作流程:
- FlatFileItemReader 先读取 CSV 文件中的产品记录
- 当文件数据处理完毕后,JdbcCursorItemReader 接管并开始从数据库读取数据
这种顺序处理方式确保了多源数据的无缝整合。
5. 配置批处理作业
定义读取器后,需要配置批处理作业本身。Spring Batch 作业由多个步骤组成:
@Bean
public Job productJob(JobRepository jobRepository, Step step) {
return new JobBuilder("productJob", jobRepository)
.start(step)
.build();
}
@Bean
public Step step(ItemReader compositeReader, ItemWriter productWriter) {
return new StepBuilder("productStep", jobRepository)
.<Product, Product>chunk(10, transactionManager)
.reader(compositeReader)
.writer(productWriter)
.build();
}
关键配置说明:
-
chunk(10)
:每处理 10 条记录提交一次事务 -
reader(compositeReader)
:使用组合读取器 -
writer(productWriter)
:指定数据写入器
productJob
负责定义作业流程,从 productStep
开始执行数据处理。整个流程通过分块处理确保高效性。
6. 运行批处理作业
使用 JobLauncher 编程式触发作业执行:
@Bean
public CommandLineRunner runJob() {
return args -> {
try {
jobLauncher.run(productJob, new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters());
} catch (Exception e) {
// 异常处理
}
};
}
❌ 踩坑提示:每次运行必须添加唯一参数(如时间戳),否则 Spring Batch 会认为这是重复作业而跳过执行。
7. 测试 CompositeItemReader
通过测试验证组合读取器的正确性,确保能正确处理 CSV 和数据库数据。
7.1. 准备测试数据
创建 CSV 文件(products.csv
):
productId,productName,stock,price
101,Apple,50,1.99
向数据库插入测试记录:
@BeforeEach
public void setUp() {
jdbcTemplate.update("INSERT INTO products (productid, productname, stock, price) VALUES (?, ?, ?, ?)",
102, "Banana", 30, 1.49);
}
7.2. 测试顺序读取
验证组合读取器是否按正确顺序处理数据:
@Test
public void givenTwoReaders_whenRead_thenProcessProductsInOrder() throws Exception {
StepExecution stepExecution = new StepExecution(
"testStep",
new JobExecution(1L, new JobParameters()),
1L);
ExecutionContext executionContext = stepExecution.getExecutionContext();
compositeReader.open(executionContext);
Product product1 = compositeReader.read();
assertNotNull(product1);
assertEquals(101, product1.getProductId());
assertEquals("Apple", product1.getProductName());
Product product2 = compositeReader.read();
assertNotNull(product2);
assertEquals(102, product2.getProductId());
assertEquals("Banana", product2.getProductName());
}
✅ 验证点:
- 第一条记录来自 CSV 文件(Apple)
- 第二条记录来自数据库(Banana)
7.3. 测试空读取器处理
验证当某个读取器返回 null 时,组合读取器是否能正确跳过并继续处理:
@Test
public void givenMultipleReader_whenOneReaderReturnNull_thenProcessDataFromNextReader() throws Exception {
ItemStreamReader<Product> emptyReader = mock(ItemStreamReader.class);
when(emptyReader.read()).thenReturn(null);
ItemStreamReader<Product> validReader = mock(ItemStreamReader.class);
when(validReader.read()).thenReturn(new Product(103L, "Cherry", 20, BigDecimal.valueOf(2.99)), null);
CompositeItemReader<Product> compositeReader = new CompositeItemReader<>(
Arrays.asList(emptyReader, validReader));
Product product = compositeReader.read();
assertNotNull(product);
assertEquals(103, product.getProductId());
assertEquals("Cherry", product.getProductName());
}
⚠️ 关键行为:组合读取器会自动跳过返回 null 的读取器,继续从下一个有效读取器获取数据。
8. 总结
本文演示了如何实现和测试 CompositeItemReader,使其能够按特定顺序处理多源数据。通过链式组合读取器,我们可以灵活处理文件、数据库等不同来源的数据流。这种模式在需要整合异构数据源的批处理场景中尤其实用,既简化了代码结构,又保证了处理流程的可靠性。