1. 介绍
本文将探讨如何在Spring Batch中实现一个Job,包含单个Reader、多个Processor和多个Writer。这种架构适用于需要一次读取数据、进行不同方式处理、并将结果写入多个目标的场景。
2. 搭建Spring Batch项目
首先需要在pom.xml
中添加以下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
<version>3.4.2</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
这些依赖提供了:
- Spring Batch核心功能
- Spring Data JPA进行数据库操作
- H2内存数据库用于开发
2.1. 准备输入CSV文件
创建名为customers.csv
的示例文件:
id,name,email,type
1,John,john.doe@example.com,A
2,Alice,alice.smith@example.com,B
3,Bob,bob.jones@example.com,A
4,Eve,eve.williams@example.com,B
文件包含四个字段:唯一ID、姓名、邮箱和类型字段(用于决定处理路径)。将该文件存放在src/main/resources
目录。
2.2. 创建数据模型
定义Customer实体类:
@Entity
public class Customer {
@Id
private Long id;
private String name;
private String email;
private String type;
// 构造方法、getter和setter
}
3. 实现CSV Reader
使用Spring Batch的FlatFileItemReader
读取CSV文件:
@Bean
public FlatFileItemReader<Customer> customerReader() {
return new FlatFileItemReaderBuilder<Customer>()
.name("customerItemReader")
.resource(new ClassPathResource("customers.csv"))
.delimited()
.names("id", "name", "email", "type")
.fieldSetMapper(new BeanWrapperFieldSetMapper<Customer>() {{
setTargetType(Customer.class);
}})
.build();
}
✅ 此配置会:
- 逐行解析CSV文件
- 将每条记录映射为Customer对象
- 自动处理文件开关操作
- 按块处理数据提升内存效率
⚠️ names()
方法中的字段名必须与CSV表头和Customer类属性完全匹配。
4. 创建条件处理器
实现两个独立的处理器和一个路由机制:
public class TypeAProcessor implements ItemProcessor<Customer, Customer> {
@Override
public Customer process(Customer customer) {
customer.setName(customer.getName().toUpperCase());
customer.setEmail("A_" + customer.getEmail());
return customer;
}
}
public class TypeBProcessor implements ItemProcessor<Customer, Customer> {
@Override
public Customer process(Customer customer) {
customer.setName(customer.getName().toLowerCase());
customer.setEmail("B_" + customer.getEmail());
return customer;
}
}
✅ 处理器特点:
- 都实现
ItemProcessor
接口 process()
方法接收Customer对象并返回修改后版本- TypeA处理器:姓名转大写,邮箱添加"A_"前缀
- TypeB处理器:姓名转小写,邮箱添加"B_"前缀
5. 实现处理器路由器
创建路由组件根据类型字段选择处理器:
public class CustomerProcessorRouter implements ItemProcessor<Customer, Customer> {
private final TypeAProcessor typeAProcessor;
private final TypeBProcessor typeBProcessor;
public CustomerProcessorRouter(TypeAProcessor typeAProcessor,
TypeBProcessor typeBProcessor) {
this.typeAProcessor = typeAProcessor;
this.typeBProcessor = typeBProcessor;
}
@Override
public Customer process(Customer customer) throws Exception {
if ("A".equals(customer.getType())) {
return typeAProcessor.process(customer);
} else if ("B".equals(customer.getType())) {
return typeBProcessor.process(customer);
}
return customer;
}
}
✅ 路由器设计优势:
- 保持处理逻辑分离
- 在Job定义中维持单一处理步骤
- 根据type字段动态选择处理器
6. 配置多个Writer
6.1. 数据库Writer配置
@Bean
public JpaItemWriter<Customer> dbWriter(EntityManagerFactory entityManagerFactory) {
JpaItemWriter<Customer> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(entityManagerFactory);
return writer;
}
✅ 此Writer特点:
- 使用JPA持久化Customer对象
- 自动处理数据库插入和更新操作
- 依赖注入EntityManagerFactory
6.2. 平面文件Writer配置
@Bean
public FlatFileItemWriter<Customer> fileWriter() {
return new FlatFileItemWriterBuilder<Customer>()
.name("customerItemWriter")
.resource(new FileSystemResource("output/processed_customers.txt"))
.delimited()
.delimiter(",")
.names("id", "name", "email", "type")
.build();
}
✅ 此Writer特点:
- 生成CSV格式输出文件
- 使用逗号分隔符
- 字段名与Customer实体属性匹配
- 输出路径:
output/processed_customers.txt
6.3. 组合Writer实现
使用CompositeItemWriter
实现多目标写入:
@Bean
public CompositeItemWriter<Customer> compositeWriter(
JpaItemWriter<Customer> dbWriter,
FlatFileItemWriter<Customer> fileWriter) {
CompositeItemWriter<Customer> writer = new CompositeItemWriter<>();
writer.setDelegates(List.of(dbWriter, fileWriter));
return writer;
}
✅ 组合Writer特点:
- 作为多个Writer的代理
- 确保每个处理项写入所有目标
- 委托顺序决定写入顺序
7. 配置Step和Job
@Bean
public Job processCustomersJob(JobBuilderFactory jobs,
StepBuilderFactory steps,
FlatFileItemReader<Customer> reader,
CustomerProcessorRouter processor,
CompositeItemWriter<Customer> writer) {
Step step = steps.get("processCustomersStep")
.<Customer, Customer>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
return jobs.get("customerProcessingJob")
.start(step)
.build();
}
✅ Job配置说明:
- 单Step处理流程
- 每次处理10条记录(chunk size)
- 处理链:Reader → Router Processor → Composite Writer
- 同时写入数据库和文件
8. 运行和测试Job
集成测试验证不同类型客户的处理结果:
List<Customer> dbCustomers = jdbcTemplate.query(
"SELECT id, name, email, type FROM customer WHERE type = 'A'",
(rs, rowNum) -> new Customer(
rs.getLong("id"),
rs.getString("name"),
rs.getString("email"),
rs.getString("type"))
);
assertFalse(dbCustomers.isEmpty());
dbCustomers.forEach(c -> {
assertEquals(c.getName(), c.getName().toUpperCase());
assertTrue(c.getEmail().startsWith("A_"));
});
Path outputFile = Paths.get("output/processed_customers.txt");
assertTrue(Files.exists(outputFile));
List<String> lines = Files.readAllLines(outputFile);
boolean hasTypeB = lines.stream().anyMatch(line -> line.endsWith(",B"));
assertTrue(hasTypeB);
lines.forEach(line -> {
String[] parts = line.split(",");
if ("B".equals(parts[3])) {
assertEquals(parts[1], parts[1].toLowerCase());
assertTrue(parts[2].startsWith("B_"));
}
});
✅ 测试验证要点:
- 数据库中类型A客户:
- 姓名转为大写
- 邮箱添加"A_"前缀
- 输出文件中类型B客户:
- 姓名转为小写
- 邮箱添加"B_"前缀
- 确保输出文件存在且包含处理记录
9. 总结
本文介绍了在Spring Batch中实现单Reader多Processor多Writer的完整方案:
- ✅ 从CSV文件读取数据
- ✅ 根据内容路由到不同处理器
- ✅ 将结果同时写入数据库和文件
这种架构特别适合需要多路径处理和多目标写入的ETL场景,通过组合组件实现灵活的数据处理流程。
完整源码可在GitHub获取。