2. 概述
在前一篇 Spring Batch 入门 文章中,我们将 Spring Batch 介绍为批量处理工具,并探讨了单线程单进程作业的配置与实现。
要实现并行处理的作业,Spring Batch 提供了多种选项。从更高层面看,并行处理有两种模式:
- 单进程多线程
- 多进程
本文将重点讲解 Step 分区技术(Partitioning),它既适用于单进程也适用于多进程作业。
3. 分区处理 Step
Spring Batch 的分区机制允许我们将 Step 的执行拆分为多个并行单元:
(示意图:展示包含分区 Step 的作业实现)
上图展示了一个带有分区 Step 的作业实现:
- 存在一个名为 "Master" 的 Step,其执行被拆分为多个 "Slave" 步骤
- 这些 Slave 步骤可以替代 Master 执行,最终结果保持不变
- Master 和 Slave 本质上都是 Step 的实例
- Slave 可以是远程服务或本地执行线程
如果需要,Master 可以向 Slave 传递数据。通过元数据(即 JobRepository),确保在单次作业执行中每个 Slave 只被执行一次。
以下是工作原理的序列图:
(序列图:展示分区执行流程)
如图所示:
PartitionStep
驱动整个执行流程PartitionHandler
负责将 Master 的工作拆分到 Slave 步骤- 最右侧的 Step 即为 Slave 实例
4. Maven 依赖
Maven 依赖与 前文 一致:
<dependencies>
<!-- Spring Core -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.3.20</version>
</dependency>
<!-- Spring Batch -->
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>4.3.7</version>
</dependency>
<!-- H2 Database -->
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.1.214</version>
</dependency>
</dependencies>
5. 配置实现
我们将扩展前文的示例:将 5 个 CSV 文件的金融数据转换为 XML 文件,通过多线程实现。核心思路是使用单个作业配合 Step 分区,创建 5 个线程各处理一个 CSV 文件。
5.1 创建主作业
@Bean(name = "partitionerJob")
public Job partitionerJob(JobRepository jobRepository)
throws UnexpectedInputException, MalformedURLException, ParseException {
return jobs.get("partitioningJob", jobRepository)
.start(partitionStep())
.build();
}
5.2 配置分区 Step
@Bean
public Step partitionStep(JobRepository jobRepository, PlatformTransactionManager transactionManager)
throws UnexpectedInputException, ParseException {
return new StepBuilder("partitionStep", jobRepository)
.partitioner("slaveStep", partitioner())
.step(slaveStep(jobRepository, transactionManager))
.taskExecutor(taskExecutor())
.build();
}
⚠️ 关键点:
- 通过
StepBuilder
构造PartitioningStep
- 需指定 Slave 步骤和
Partitioner
实现类
5.3 自定义 Partitioner
Partitioner
接口用于定义每个 Slave 的输入参数。我们实现 CustomMultiResourcePartitioner
将文件名存入 ExecutionContext
:
public class CustomMultiResourcePartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<>(gridSize);
int i = 0, k = 1;
for (Resource resource : resources) {
ExecutionContext context = new ExecutionContext();
Assert.state(resource.exists(), "Resource does not exist: "
+ resource);
context.putString("fileName", resource.getFilename());
context.putString("opFileName", "output"+k+++".xml");
map.put("PARTITION_KEY" + i, context);
i++;
}
return map;
}
}
5.4 配置 Partitioner Bean
@Bean
public CustomMultiResourcePartitioner partitioner() {
CustomMultiResourcePartitioner partitioner
= new CustomMultiResourcePartitioner();
Resource[] resources;
try {
resources = resoursePatternResolver
.getResources("file:src/main/resources/input/*.csv");
} catch (IOException e) {
throw new RuntimeException("I/O problems when resolving"
+ " the input file pattern.", e);
}
partitioner.setResources(resources);
return partitioner;
}
5.5 创建 Slave Step
Slave Step 的 Reader/Writer 需从 StepExecutionContext
获取文件名。关键点:必须使用 @StepScope
注解确保每次 Step 执行时重新创建 Bean:
@StepScope
@Bean
public FlatFileItemReader<Transaction> itemReader(
@Value("#{stepExecutionContext[fileName]}") String filename)
throws UnexpectedInputException, ParseException {
FlatFileItemReader<Transaction> reader
= new FlatFileItemReader<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
String[] tokens
= {"username", "userid", "transactiondate", "amount"};
tokenizer.setNames(tokens);
reader.setResource(new ClassPathResource("input/" + filename));
DefaultLineMapper<Transaction> lineMapper
= new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
reader.setLinesToSkip(1);
reader.setLineMapper(lineMapper);
return reader;
}
@Bean
@StepScope
public ItemWriter<Transaction> itemWriter(Marshaller marshaller,
@Value("#{stepExecutionContext[opFileName]}") String filename)
throws MalformedURLException {
StaxEventItemWriter<Transaction> itemWriter
= new StaxEventItemWriter<Transaction>();
itemWriter.setMarshaller(marshaller);
itemWriter.setRootTagName("transactionRecord");
itemWriter.setResource(new FileSystemResource("src/main/resources/output/" + filename));
return itemWriter;
}
5.6 定义 Slave Step
@Bean
public Step slaveStep(JobRepository jobRepository, PlatformTransactionManager transactionManager)
throws UnexpectedInputException, ParseException {
return new StepBuilder("slaveStep").<Transaction, Transaction> chunk(1, transactionManager)
.reader(itemReader(null)) // 参数会被 ExecutionContext 覆盖
.writer(itemWriter(marshaller(), null))
.build();
}
6. 总结
本文演示了如何使用 Spring Batch 的 Partitioner 实现并行处理:
- ✅ 通过分区技术将任务拆分为多个并行单元
- ✅ 利用
ExecutionContext
在 Master/Slave 间传递参数 - ✅ 使用
@StepScope
确保动态参数注入 - ⚠️ 注意:Reader/Writer 必须是 Step 作用域,否则参数注入会失效
完整实现代码已上传至 GitHub,可直接运行测试。