2. 概述

在前一篇 Spring Batch 入门 文章中,我们将 Spring Batch 介绍为批量处理工具,并探讨了单线程单进程作业的配置与实现。

要实现并行处理的作业,Spring Batch 提供了多种选项。从更高层面看,并行处理有两种模式:

  1. 单进程多线程
  2. 多进程

本文将重点讲解 Step 分区技术(Partitioning),它既适用于单进程也适用于多进程作业。

3. 分区处理 Step

Spring Batch 的分区机制允许我们将 Step 的执行拆分为多个并行单元:

分区Step示意图 (示意图:展示包含分区 Step 的作业实现)

上图展示了一个带有分区 Step 的作业实现:

  • 存在一个名为 "Master" 的 Step,其执行被拆分为多个 "Slave" 步骤
  • 这些 Slave 步骤可以替代 Master 执行,最终结果保持不变
  • Master 和 Slave 本质上都是 Step 的实例
  • Slave 可以是远程服务或本地执行线程

如果需要,Master 可以向 Slave 传递数据。通过元数据(即 JobRepository),确保在单次作业执行中每个 Slave 只被执行一次。

以下是工作原理的序列图:

序列图 (序列图:展示分区执行流程)

如图所示:

  • PartitionStep 驱动整个执行流程
  • PartitionHandler 负责将 Master 的工作拆分到 Slave 步骤
  • 最右侧的 Step 即为 Slave 实例

4. Maven 依赖

Maven 依赖与 前文 一致:

<dependencies>
    <!-- Spring Core -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>5.3.20</version>
    </dependency>
    
    <!-- Spring Batch -->
    <dependency>
        <groupId>org.springframework.batch</groupId>
        <artifactId>spring-batch-core</artifactId>
        <version>4.3.7</version>
    </dependency>
    
    <!-- H2 Database -->
    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
        <version>2.1.214</version>
    </dependency>
</dependencies>

5. 配置实现

我们将扩展前文的示例:将 5 个 CSV 文件的金融数据转换为 XML 文件,通过多线程实现。核心思路是使用单个作业配合 Step 分区,创建 5 个线程各处理一个 CSV 文件。

5.1 创建主作业

@Bean(name = "partitionerJob")
public Job partitionerJob(JobRepository jobRepository) 
  throws UnexpectedInputException, MalformedURLException, ParseException {
    return jobs.get("partitioningJob", jobRepository)
      .start(partitionStep())
      .build();
}

5.2 配置分区 Step

@Bean
public Step partitionStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) 
  throws UnexpectedInputException, ParseException {
    return new StepBuilder("partitionStep", jobRepository)
      .partitioner("slaveStep", partitioner())
      .step(slaveStep(jobRepository, transactionManager))
      .taskExecutor(taskExecutor())
      .build();
}

⚠️ 关键点:

  • 通过 StepBuilder 构造 PartitioningStep
  • 需指定 Slave 步骤和 Partitioner 实现类

5.3 自定义 Partitioner

Partitioner 接口用于定义每个 Slave 的输入参数。我们实现 CustomMultiResourcePartitioner 将文件名存入 ExecutionContext

public class CustomMultiResourcePartitioner implements Partitioner {
 
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> map = new HashMap<>(gridSize);
        int i = 0, k = 1;
        for (Resource resource : resources) {
            ExecutionContext context = new ExecutionContext();
            Assert.state(resource.exists(), "Resource does not exist: " 
              + resource);
            context.putString("fileName", resource.getFilename());
            context.putString("opFileName", "output"+k+++".xml");
            map.put("PARTITION_KEY" + i, context);
            i++;
        }
        return map;
    }
}

5.4 配置 Partitioner Bean

@Bean
public CustomMultiResourcePartitioner partitioner() {
    CustomMultiResourcePartitioner partitioner 
      = new CustomMultiResourcePartitioner();
    Resource[] resources;
    try {
        resources = resoursePatternResolver
          .getResources("file:src/main/resources/input/*.csv");
    } catch (IOException e) {
        throw new RuntimeException("I/O problems when resolving"
          + " the input file pattern.", e);
    }
    partitioner.setResources(resources);
    return partitioner;
}

5.5 创建 Slave Step

Slave Step 的 Reader/Writer 需从 StepExecutionContext 获取文件名。关键点:必须使用 @StepScope 注解确保每次 Step 执行时重新创建 Bean:

@StepScope
@Bean
public FlatFileItemReader<Transaction> itemReader(
  @Value("#{stepExecutionContext[fileName]}") String filename)
  throws UnexpectedInputException, ParseException {
 
    FlatFileItemReader<Transaction> reader 
      = new FlatFileItemReader<>();
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
    String[] tokens 
      = {"username", "userid", "transactiondate", "amount"};
    tokenizer.setNames(tokens);
    reader.setResource(new ClassPathResource("input/" + filename));
    DefaultLineMapper<Transaction> lineMapper 
      = new DefaultLineMapper<>();
    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
    reader.setLinesToSkip(1);
    reader.setLineMapper(lineMapper);
    return reader;
}
@Bean
@StepScope
public ItemWriter<Transaction> itemWriter(Marshaller marshaller, 
  @Value("#{stepExecutionContext[opFileName]}") String filename)
  throws MalformedURLException {
    StaxEventItemWriter<Transaction> itemWriter 
      = new StaxEventItemWriter<Transaction>();
    itemWriter.setMarshaller(marshaller);
    itemWriter.setRootTagName("transactionRecord");
    itemWriter.setResource(new FileSystemResource("src/main/resources/output/" + filename));
    return itemWriter;
}

5.6 定义 Slave Step

@Bean
public Step slaveStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) 
  throws UnexpectedInputException, ParseException {
    return new StepBuilder("slaveStep").<Transaction, Transaction> chunk(1, transactionManager)
      .reader(itemReader(null))  // 参数会被 ExecutionContext 覆盖
      .writer(itemWriter(marshaller(), null))
      .build();
}

6. 总结

本文演示了如何使用 Spring Batch 的 Partitioner 实现并行处理:

  • ✅ 通过分区技术将任务拆分为多个并行单元
  • ✅ 利用 ExecutionContext 在 Master/Slave 间传递参数
  • ✅ 使用 @StepScope 确保动态参数注入
  • ⚠️ 注意:Reader/Writer 必须是 Step 作用域,否则参数注入会失效

完整实现代码已上传至 GitHub,可直接运行测试。


原始标题:Spring Batch using Partitioner