1. 介绍

在 Spring Batch 中,CompositeItemReader 是一个将多个 ItemReader 实例组合成单一读取器的工具。当我们需要从多个数据源或按特定顺序读取数据时,这个组件特别有用。 比如,我们可能需要同时从数据库和文件读取记录,或者按特定顺序处理两个不同表的数据。

CompositeItemReader 简化了批处理作业中多读取器的管理,确保数据处理的效率和灵活性。本教程将带你实现一个 CompositeItemReader,并通过示例和测试用例验证其行为。

2. 理解 CompositeItemReader

CompositeItemReader 的工作原理是将读取过程委托给一组 ItemReader 实例。它会按定义顺序依次从每个读取器读取数据,确保数据被顺序处理。

这在以下场景中特别实用:

  • 从多个数据库或表读取数据
  • 合并文件和数据库的数据
  • 按特定顺序处理不同来源的数据

⚠️ 注意:CompositeItemReader 位于 org.springframework.batch.item.support 包下,自 Spring Batch 5.2.0 版本引入。

3. 实现 CompositeItemReader

我们通过一个示例来演示:从两个不同来源(平面文件和数据库)读取产品数据。目标是将两个来源的产品数据合并为单一处理流,确保所有可用记录被一起处理。

3.1. 创建 Product

首先需要定义 Product 类来表示数据结构。这个类封装了产品详情(ID、名称、库存、价格),作为 CSV 文件和数据库读取的统一数据模型

public class Product {
    private Long productId;
    private String productName;
    private Integer stock;
    private BigDecimal price;

    public Product(Long productId, String productName, Integer stock, BigDecimal price) {
        this.productId = productId;
        this.productName = productName;
        this.stock = stock;
        this.price = price;
    }

    // Getters and Setters
}

这个 Product 类将作为批处理作业中每条记录的数据载体。接下来我们为 CSV 文件和数据库分别创建 ItemReader

3.2. 产品数据的平面文件读取器

使用 FlatFileItemReader 从 CSV 文件读取数据。配置如下:

@Bean
public FlatFileItemReader<Product> fileReader() {
  return new FlatFileItemReaderBuilder<Product>()
    .name("fileReader")
    .resource(new ClassPathResource("products.csv"))
    .delimited()
    .names("productId", "productName", "stock", "price")
    .linesToSkip(1)
    .targetType(Product.class)
    .build();
}

关键配置说明:

  • delimited():指定字段分隔符(默认逗号)
  • names():定义 CSV 列名与 Product 类属性的映射
  • targetType(Product.class):将字段映射到类属性

3.3. 产品数据的数据库读取器

使用 JdbcCursorItemReader 从数据库的 products 表读取数据。该读取器通过 SQL 查询获取产品详情并映射到 Product

@Bean
public JdbcCursorItemReader<Product> dbReader(DataSource dataSource) {
  return new JdbcCursorItemReaderBuilder<Product>()
    .name("dbReader")
    .dataSource(dataSource())
    .sql("SELECT productid, productname, stock, price FROM products")
    .rowMapper((rs, rowNum) -> new Product(
      rs.getLong("productid"),
      rs.getString("productname"),
      rs.getInt("stock"),
      rs.getBigDecimal("price")))
    .build();
}

✅ 优势:JdbcCursorItemReader 使用游标逐行读取数据库记录,非常适合批处理场景。rowMapper() 函数负责将结果集列映射到 Product 类属性。

4. 使用 CompositeItemReader 组合读取器

现在将两个读取器通过 CompositeItemReader 整合:

@Bean
public CompositeItemReader<Product> compositeReader() {
    return new CompositeItemReader<>(Arrays.asList(fileReader(), dbReader()));
}

工作流程:

  1. FlatFileItemReader 先读取 CSV 文件中的产品记录
  2. 当文件数据处理完毕后,JdbcCursorItemReader 接管并开始从数据库读取数据

这种顺序处理方式确保了多源数据的无缝整合。

5. 配置批处理作业

定义读取器后,需要配置批处理作业本身。Spring Batch 作业由多个步骤组成:

@Bean
public Job productJob(JobRepository jobRepository, Step step) {
  return new JobBuilder("productJob", jobRepository)
    .start(step)
    .build();
}

@Bean
public Step step(ItemReader compositeReader, ItemWriter productWriter) {
  return new StepBuilder("productStep", jobRepository)
    .<Product, Product>chunk(10, transactionManager)
    .reader(compositeReader)
    .writer(productWriter)
    .build();
}

关键配置说明:

  • chunk(10):每处理 10 条记录提交一次事务
  • reader(compositeReader):使用组合读取器
  • writer(productWriter):指定数据写入器

productJob 负责定义作业流程,从 productStep 开始执行数据处理。整个流程通过分块处理确保高效性。

6. 运行批处理作业

使用 JobLauncher 编程式触发作业执行:

@Bean
public CommandLineRunner runJob() {
    return args -> {
        try {
            jobLauncher.run(productJob, new JobParametersBuilder()
              .addLong("time", System.currentTimeMillis())
              .toJobParameters());
        } catch (Exception e) {
            // 异常处理
        }
    };
}

❌ 踩坑提示:每次运行必须添加唯一参数(如时间戳),否则 Spring Batch 会认为这是重复作业而跳过执行。

7. 测试 CompositeItemReader

通过测试验证组合读取器的正确性,确保能正确处理 CSV 和数据库数据。

7.1. 准备测试数据

创建 CSV 文件(products.csv):

productId,productName,stock,price
101,Apple,50,1.99

向数据库插入测试记录:

@BeforeEach
public void setUp() {
    jdbcTemplate.update("INSERT INTO products (productid, productname, stock, price) VALUES (?, ?, ?, ?)",
      102, "Banana", 30, 1.49);
}

7.2. 测试顺序读取

验证组合读取器是否按正确顺序处理数据:

@Test
public void givenTwoReaders_whenRead_thenProcessProductsInOrder() throws Exception {
    StepExecution stepExecution = new StepExecution(
      "testStep",
      new JobExecution(1L, new JobParameters()),
      1L);
    ExecutionContext executionContext = stepExecution.getExecutionContext();
    compositeReader.open(executionContext);

    Product product1 = compositeReader.read();
    assertNotNull(product1);
    assertEquals(101, product1.getProductId());
    assertEquals("Apple", product1.getProductName());

    Product product2 = compositeReader.read();
    assertNotNull(product2);
    assertEquals(102, product2.getProductId());
    assertEquals("Banana", product2.getProductName());
}

✅ 验证点:

  1. 第一条记录来自 CSV 文件(Apple)
  2. 第二条记录来自数据库(Banana)

7.3. 测试空读取器处理

验证当某个读取器返回 null 时,组合读取器是否能正确跳过并继续处理:

@Test
public void givenMultipleReader_whenOneReaderReturnNull_thenProcessDataFromNextReader() throws Exception {
    ItemStreamReader<Product> emptyReader = mock(ItemStreamReader.class);
    when(emptyReader.read()).thenReturn(null);

    ItemStreamReader<Product> validReader = mock(ItemStreamReader.class);
    when(validReader.read()).thenReturn(new Product(103L, "Cherry", 20, BigDecimal.valueOf(2.99)), null);

    CompositeItemReader<Product> compositeReader = new CompositeItemReader<>(
      Arrays.asList(emptyReader, validReader));

    Product product = compositeReader.read();
    assertNotNull(product);
    assertEquals(103, product.getProductId());
    assertEquals("Cherry", product.getProductName());
}

⚠️ 关键行为:组合读取器会自动跳过返回 null 的读取器,继续从下一个有效读取器获取数据。

8. 总结

本文演示了如何实现和测试 CompositeItemReader,使其能够按特定顺序处理多源数据。通过链式组合读取器,我们可以灵活处理文件、数据库等不同来源的数据流。这种模式在需要整合异构数据源的批处理场景中尤其实用,既简化了代码结构,又保证了处理流程的可靠性。


原始标题:Composite Item Reader in Spring Batch | Baeldung