1. 介绍

本文将探讨如何在Spring Batch中实现一个Job,包含单个Reader、多个Processor和多个Writer。这种架构适用于需要一次读取数据、进行不同方式处理、并将结果写入多个目标的场景。

2. 搭建Spring Batch项目

首先需要在pom.xml中添加以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
    <version>3.5.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
    <version>3.4.2</version>
</dependency>
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <scope>runtime</scope>
</dependency>

这些依赖提供了:

  • Spring Batch核心功能
  • Spring Data JPA进行数据库操作
  • H2内存数据库用于开发

2.1. 准备输入CSV文件

创建名为customers.csv的示例文件:

id,name,email,type
1,John,john.doe@example.com,A
2,Alice,alice.smith@example.com,B
3,Bob,bob.jones@example.com,A
4,Eve,eve.williams@example.com,B

文件包含四个字段:唯一ID、姓名、邮箱和类型字段(用于决定处理路径)。将该文件存放在src/main/resources目录。

2.2. 创建数据模型

定义Customer实体类:

@Entity
public class Customer {
    @Id
    private Long id;
    private String name;
    private String email;
    private String type;
    
    // 构造方法、getter和setter
}

3. 实现CSV Reader

使用Spring Batch的FlatFileItemReader读取CSV文件:

@Bean
public FlatFileItemReader<Customer> customerReader() {
    return new FlatFileItemReaderBuilder<Customer>()
      .name("customerItemReader")
      .resource(new ClassPathResource("customers.csv"))
      .delimited()
      .names("id", "name", "email", "type")
      .fieldSetMapper(new BeanWrapperFieldSetMapper<Customer>() {{
        setTargetType(Customer.class);
      }})
      .build();
}

✅ 此配置会:

  • 逐行解析CSV文件
  • 将每条记录映射为Customer对象
  • 自动处理文件开关操作
  • 按块处理数据提升内存效率

⚠️ names()方法中的字段名必须与CSV表头和Customer类属性完全匹配。

4. 创建条件处理器

实现两个独立的处理器和一个路由机制:

public class TypeAProcessor implements ItemProcessor<Customer, Customer> {
    @Override
    public Customer process(Customer customer) {
        customer.setName(customer.getName().toUpperCase());
        customer.setEmail("A_" + customer.getEmail());
        return customer;
    }
}

public class TypeBProcessor implements ItemProcessor<Customer, Customer> {
    @Override
    public Customer process(Customer customer) {
        customer.setName(customer.getName().toLowerCase());
        customer.setEmail("B_" + customer.getEmail());
        return customer;
    }
}

✅ 处理器特点:

  • 都实现ItemProcessor接口
  • process()方法接收Customer对象并返回修改后版本
  • TypeA处理器:姓名转大写,邮箱添加"A_"前缀
  • TypeB处理器:姓名转小写,邮箱添加"B_"前缀

5. 实现处理器路由器

创建路由组件根据类型字段选择处理器:

public class CustomerProcessorRouter implements ItemProcessor<Customer, Customer> {
    private final TypeAProcessor typeAProcessor;
    private final TypeBProcessor typeBProcessor;

    public CustomerProcessorRouter(TypeAProcessor typeAProcessor, 
      TypeBProcessor typeBProcessor) {
        this.typeAProcessor = typeAProcessor;
        this.typeBProcessor = typeBProcessor;
    }

    @Override
    public Customer process(Customer customer) throws Exception {
        if ("A".equals(customer.getType())) {
            return typeAProcessor.process(customer);
        } else if ("B".equals(customer.getType())) {
            return typeBProcessor.process(customer);
        }
        return customer;
    }
}

✅ 路由器设计优势:

  • 保持处理逻辑分离
  • 在Job定义中维持单一处理步骤
  • 根据type字段动态选择处理器

6. 配置多个Writer

6.1. 数据库Writer配置

@Bean
public JpaItemWriter<Customer> dbWriter(EntityManagerFactory entityManagerFactory) {
    JpaItemWriter<Customer> writer = new JpaItemWriter<>();
    writer.setEntityManagerFactory(entityManagerFactory);
    return writer;
}

✅ 此Writer特点:

  • 使用JPA持久化Customer对象
  • 自动处理数据库插入和更新操作
  • 依赖注入EntityManagerFactory

6.2. 平面文件Writer配置

@Bean
public FlatFileItemWriter<Customer> fileWriter() {
    return new FlatFileItemWriterBuilder<Customer>()
      .name("customerItemWriter")
      .resource(new FileSystemResource("output/processed_customers.txt"))
      .delimited()
      .delimiter(",")
      .names("id", "name", "email", "type")
      .build();
}

✅ 此Writer特点:

  • 生成CSV格式输出文件
  • 使用逗号分隔符
  • 字段名与Customer实体属性匹配
  • 输出路径:output/processed_customers.txt

6.3. 组合Writer实现

使用CompositeItemWriter实现多目标写入:

@Bean
public CompositeItemWriter<Customer> compositeWriter(
  JpaItemWriter<Customer> dbWriter,
  FlatFileItemWriter<Customer> fileWriter) {
    CompositeItemWriter<Customer> writer = new CompositeItemWriter<>();
    writer.setDelegates(List.of(dbWriter, fileWriter));
    return writer;
}

✅ 组合Writer特点:

  • 作为多个Writer的代理
  • 确保每个处理项写入所有目标
  • 委托顺序决定写入顺序

7. 配置Step和Job

@Bean
public Job processCustomersJob(JobBuilderFactory jobs,
  StepBuilderFactory steps,
  FlatFileItemReader<Customer> reader,
  CustomerProcessorRouter processor,
  CompositeItemWriter<Customer> writer) {

    Step step = steps.get("processCustomersStep")
      .<Customer, Customer>chunk(10)
      .reader(reader)
      .processor(processor)
      .writer(writer)
      .build();

    return jobs.get("customerProcessingJob")
      .start(step)
      .build();
}

✅ Job配置说明:

  • 单Step处理流程
  • 每次处理10条记录(chunk size)
  • 处理链:Reader → Router Processor → Composite Writer
  • 同时写入数据库和文件

8. 运行和测试Job

集成测试验证不同类型客户的处理结果:

List<Customer> dbCustomers = jdbcTemplate.query(
    "SELECT id, name, email, type FROM customer WHERE type = 'A'",
    (rs, rowNum) -> new Customer(
        rs.getLong("id"),
        rs.getString("name"),
        rs.getString("email"),
        rs.getString("type"))
);

assertFalse(dbCustomers.isEmpty());

dbCustomers.forEach(c -> {
    assertEquals(c.getName(), c.getName().toUpperCase());
    assertTrue(c.getEmail().startsWith("A_"));
});

Path outputFile = Paths.get("output/processed_customers.txt");
assertTrue(Files.exists(outputFile));

List<String> lines = Files.readAllLines(outputFile);

boolean hasTypeB = lines.stream().anyMatch(line -> line.endsWith(",B"));
assertTrue(hasTypeB);

lines.forEach(line -> {
    String[] parts = line.split(",");
    if ("B".equals(parts[3])) {
        assertEquals(parts[1], parts[1].toLowerCase());
        assertTrue(parts[2].startsWith("B_"));
    }
});

✅ 测试验证要点:

  • 数据库中类型A客户:
    • 姓名转为大写
    • 邮箱添加"A_"前缀
  • 输出文件中类型B客户:
    • 姓名转为小写
    • 邮箱添加"B_"前缀
  • 确保输出文件存在且包含处理记录

9. 总结

本文介绍了在Spring Batch中实现单Reader多Processor多Writer的完整方案:

  1. ✅ 从CSV文件读取数据
  2. ✅ 根据内容路由到不同处理器
  3. ✅ 将结果同时写入数据库和文件

这种架构特别适合需要多路径处理和多目标写入的ETL场景,通过组合组件实现灵活的数据处理流程。

完整源码可在GitHub获取。


原始标题:One Reader with Multiple Processors and Writers in Spring Batch | Baeldung