1. 引言

Spring Batch 提供两种实现作业的方式:Tasklets 和 Chunks。本文将通过一个实际案例,演示如何配置和实现这两种方法。

2. 依赖配置

首先添加必要依赖

<dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-core</artifactId>
    <version>5.1.1</version>
</dependency>
<dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-test</artifactId>
    <version>5.0.0</version>
    <scope>test</scope>
</dependency>

最新版本请参考 Maven Centralspring-batch-test

3. 场景说明

考虑以下 CSV 文件内容:

Mae Hodges,10/22/1972
Gary Potter,02/22/1953
Betty Wise,02/17/1968
Wayne Rose,04/06/1977
Adam Caldwell,09/27/1995
Lucille Phillips,05/14/1992

每行第一列是姓名,第二列是出生日期。我们的目标是生成新 CSV 文件,包含姓名和年龄:

Mae Hodges,45
Gary Potter,64
Betty Wise,49
Wayne Rose,40
Adam Caldwell,22
Lucille Phillips,25

现在开始用两种方式实现方案,先从 Tasklets 开始。

4. Tasklets 方案

4.1 设计思路

Tasklets 适用于在单个步骤中执行单一任务。作业由多个顺序执行的步骤组成,每个步骤只执行一个明确定义的任务

我们的作业包含三个步骤:

  1. 从输入 CSV 文件读取行
  2. 计算每人的年龄
  3. 将姓名和年龄写入新 CSV 文件

创建三个类分别实现:

  • LinesReader 负责读取文件:
    public class LinesReader implements Tasklet {
      // ...
    }
    
  • LinesProcessor 计算年龄:
    public class LinesProcessor implements Tasklet {
      // ...
    }
    
  • LinesWriter 写入结果:
    public class LinesWriter implements Tasklet {
      // ...
    }
    

所有步骤都需实现 Tasklet 接口,强制实现 execute 方法:

@Override
public RepeatStatus execute(StepContribution stepContribution, 
  ChunkContext chunkContext) throws Exception {
    // ...
}

4.2 配置

在 Spring 应用上下文中添加配置:

@Configuration
public class TaskletsConfig {

    @Bean
    protected Step readLines(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("readLines", jobRepository)
          .tasklet(linesReader(), transactionManager)
          .build();
    }

    @Bean
    protected Step processLines(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("processLines", jobRepository)
          .tasklet(linesProcessor(), transactionManager)
          .build();
    }

    @Bean
    protected Step writeLines(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("writeLines", jobRepository)
          .tasklet(linesWriter(), transactionManager)
          .build();
    }

    @Bean
    public Job job(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new JobBuilder("taskletsJob", jobRepository)
          .start(readLines(jobRepository, transactionManager))
          .next(processLines(jobRepository, transactionManager))
          .next(writeLines(jobRepository, transactionManager))
          .build();
    }

    // ...
}

作业流程定义完成,现在添加业务逻辑。

4.3 模型与工具类

创建 Line 类处理 CSV 行:

public class Line implements Serializable {

    private String name;
    private LocalDate dob;
    private Long age;

    // 标准构造器、getter/setter 和 toString 实现
}

⚠️ 注意:Line 实现了 Serializable,因为 Spring Batch 要求步骤间传递的对象必须可序列化

使用 OpenCSV 处理文件:

<dependency>
    <groupId>com.opencsv</groupId>
    <artifactId>opencsv</artifactId>
    <version>5.8</version>
</dependency>

创建 FileUtils 工具类:

public class FileUtils {

    public Line readLine() throws Exception {
        if (CSVReader == null) 
          initReader();
        String[] line = CSVReader.readNext();
        if (line == null) 
          return null;
        return new Line(
          line[0], 
          LocalDate.parse(
            line[1], 
            DateTimeFormatter.ofPattern("MM/dd/yyyy")));
    }

    public void writeLine(Line line) throws Exception {
        if (CSVWriter == null) 
          initWriter();
        String[] lineStr = new String[2];
        lineStr[0] = line.getName();
        lineStr[1] = line
          .getAge()
          .toString();
        CSVWriter.writeNext(lineStr);
    }

    // ...
}

4.4 LinesReader 实现

public class LinesReader implements Tasklet, StepExecutionListener {

    private final Logger logger = LoggerFactory
      .getLogger(LinesReader.class);

    private List<Line> lines;
    private FileUtils fu;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        lines = new ArrayList<>();
        fu = new FileUtils(
          "taskletsvschunks/input/tasklets-vs-chunks.csv");
        logger.debug("Lines Reader initialized.");
    }

    @Override
    public RepeatStatus execute(StepContribution stepContribution, 
      ChunkContext chunkContext) throws Exception {
        Line line = fu.readLine();
        while (line != null) {
            lines.add(line);
            logger.debug("Read line: " + line.toString());
            line = fu.readLine();
        }
        return RepeatStatus.FINISHED;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        fu.closeReader();
        stepExecution
          .getJobExecution()
          .getExecutionContext()
          .put("lines", this.lines);
        logger.debug("Lines Reader ended.");
        return ExitStatus.COMPLETED;
    }
}

关键点:

  • execute 方法循环读取所有行到内存列表
  • afterStep 将结果存入作业上下文供后续步骤使用:
    stepExecution
    .getJobExecution()
    .getExecutionContext()
    .put("lines", this.lines);
    

4.5 LinesProcessor 实现

public class LinesProcessor implements Tasklet, StepExecutionListener {

    private Logger logger = LoggerFactory.getLogger(
      LinesProcessor.class);

    private List<Line> lines;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        ExecutionContext executionContext = stepExecution
          .getJobExecution()
          .getExecutionContext();
        this.lines = (List<Line>) executionContext.get("lines");
        logger.debug("Lines Processor initialized.");
    }

    @Override
    public RepeatStatus execute(StepContribution stepContribution, 
      ChunkContext chunkContext) throws Exception {
        for (Line line : lines) {
            long age = ChronoUnit.YEARS.between(
              line.getDob(), 
              LocalDate.now());
            logger.debug("Calculated age " + age + " for line " + line.toString());
            line.setAge(age);
        }
        return RepeatStatus.FINISHED;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        logger.debug("Lines Processor ended.");
        return ExitStatus.COMPLETED;
    }
}

✅ 直接修改从上下文获取的列表,无需重新存储。

4.6 LinesWriter 实现

public class LinesWriter implements Tasklet, StepExecutionListener {

    private final Logger logger = LoggerFactory
      .getLogger(LinesWriter.class);

    private List<Line> lines;
    private FileUtils fu;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        ExecutionContext executionContext = stepExecution
          .getJobExecution()
          .getExecutionContext();
        this.lines = (List<Line>) executionContext.get("lines");
        fu = new FileUtils("output.csv");
        logger.debug("Lines Writer initialized.");
    }

    @Override
    public RepeatStatus execute(StepContribution stepContribution, 
      ChunkContext chunkContext) throws Exception {
        for (Line line : lines) {
            fu.writeLine(line);
            logger.debug("Wrote line " + line.toString());
        }
        return RepeatStatus.FINISHED;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        fu.closeWriter();
        logger.debug("Lines Writer ended.");
        return ExitStatus.COMPLETED;
    }
}

4.7 运行作业

创建测试类:

@SpringBatchTest
@EnableAutoConfiguration
@ContextConfiguration(classes = TaskletsConfig.class)
public class TaskletsIntegrationTest {

    @Autowired 
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Test
    public void givenTaskletsJob_whenJobEnds_thenStatusCompleted()
      throws Exception {
        JobExecution jobExecution = jobLauncherTestUtils.launchJob();
        assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
    }
}

补充必要 Bean:

@Bean
public JobLauncherTestUtils jobLauncherTestUtils() {
    return new JobLauncherTestUtils();
}

@Bean
public JobRepository jobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource());
    factory.setTransactionManager(transactionManager());
    return factory.getObject();
}

@Bean
public DataSource dataSource() {
    DriverManagerDataSource dataSource = new DriverManagerDataSource();
    dataSource.setDriverClassName("org.sqlite.JDBC");
    dataSource.setUrl("jdbc:sqlite:repository.sqlite");
    return dataSource;
}

@Bean
public PlatformTransactionManager transactionManager() {
    return new ResourcelessTransactionManager();
}

@Bean
public JobLauncher jobLauncher() throws Exception {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setJobRepository(jobRepository());
    return jobLauncher;
}

运行后生成 output.csv,日志显示执行流程:

[main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader initialized.
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader ended.
[main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor initialized.
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 64 for line [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 49 for line [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor ended.
[main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer initialized.
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Gary Potter,02/22/1953,64]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Betty Wise,02/17/1968,49]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25]
[main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer ended.

5. Chunks 方案

5.1 设计思路

顾名思义,此方案按数据块处理。不同于一次性处理所有行,而是分批次(chunk)读取、处理和写入固定数量的记录。

执行流程变为:

  1. 循环直到文件结束:
    • 对 X 条记录:
      • 读取一行
      • 处理一行
    • 写入 X 条记录

创建三个组件:

public class LineReader {
     // ...
}
public class LineProcessor {
    // ...
}
public class LinesWriter {
    // ...
}

5.2 配置

作业配置差异明显:

@Configuration
public class ChunksConfig {

    @Bean
    public ItemReader<Line> itemReader() {
        return new LineReader();
    }

    @Bean
    public ItemProcessor<Line, Line> itemProcessor() {
        return new LineProcessor();
    }

    @Bean
    public ItemWriter<Line> itemWriter() {
        return new LinesWriter();
    }

    @Bean
    protected Step processLines(JobRepository jobRepository, PlatformTransactionManager transactionManager, ItemReader<Line> reader,
      ItemProcessor<Line, Line> processor, ItemWriter<Line> writer) {
        return new StepBuilder("processLines", jobRepository).<Line, Line> chunk(2, transactionManager)
          .reader(reader)
          .processor(processor)
          .writer(writer)
          .build();
    }

    @Bean
    public Job job(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new JobBuilder("chunksJob", jobRepository)
          .start(processLines(jobRepository, transactionManager, itemReader(), itemProcessor(), itemWriter()))
          .build();
    }

}

关键点:

  • 单步骤包含 reader、processor 和 writer
  • chunk(2) 指定每个数据块处理 2 条记录

5.3 LineReader 实现

实现 ItemReader 接口:

public class LineReader implements 
  ItemReader<Line>, StepExecutionListener {

    private final Logger logger = LoggerFactory
      .getLogger(LineReader.class);
 
    private FileUtils fu;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        fu = new FileUtils("taskletsvschunks/input/tasklets-vs-chunks.csv");
        logger.debug("Line Reader initialized.");
    }

    @Override
    public Line read() throws Exception {
        Line line = fu.readLine();
        if (line != null) logger.debug("Read line: " + line.toString());
        return line;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        fu.closeReader();
        logger.debug("Line Reader ended.");
        return ExitStatus.COMPLETED;
    }
}

❌ 注意:beforeStepafterStep 在整个步骤开始/结束时执行,而非每条记录。

5.4 LineProcessor 实现

实现 ItemProcessor 接口:

public class LineProcessor implements 
  ItemProcessor<Line, Line>, StepExecutionListener {

    private Logger logger = LoggerFactory.getLogger(LineProcessor.class);

    @Override
    public void beforeStep(StepExecution stepExecution) {
        logger.debug("Line Processor initialized.");
    }
    
    @Override
    public Line process(Line line) throws Exception {
        long age = ChronoUnit.YEARS
          .between(line.getDob(), LocalDate.now());
        logger.debug(
          "Calculated age " + age + " for line " + line.toString());
        line.setAge(age);
        return line;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        logger.debug("Line Processor ended.");
        return ExitStatus.COMPLETED;
    }
}

process() 方法接收输入行,处理后返回输出行。

5.5 LinesWriter 实现

实现 ItemWriter 接口,一次写入整个数据块

public class LinesWriter implements 
  ItemWriter<Line>, StepExecutionListener {

    private final Logger logger = LoggerFactory
      .getLogger(LinesWriter.class);
 
    private FileUtils fu;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        fu = new FileUtils("output.csv");
        logger.debug("Line Writer initialized.");
    }

    @Override
    public void write(List<? extends Line> lines) throws Exception {
        for (Line line : lines) {
            fu.writeLine(line);
            logger.debug("Wrote line " + line.toString());
        }
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        fu.closeWriter();
        logger.debug("Line Writer ended.");
        return ExitStatus.COMPLETED;
    }
}

5.6 运行作业

创建测试类:

@SpringBatchTest
@EnableAutoConfiguration
@ContextConfiguration(classes = ChunksConfig.class)
public class ChunksIntegrationTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Test
    public void givenChunksJob_whenJobEnds_thenStatusCompleted() 
      throws Exception {
 
        JobExecution jobExecution = jobLauncherTestUtils.launchJob();
 
        assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus()); 
    }
}

运行后生成相同 output.csv,日志显示分块处理流程:

[main] DEBUG o.b.t.chunks.LineReader - Line Reader initialized.
[main] DEBUG o.b.t.chunks.LinesWriter - Line Writer initialized.
[main] DEBUG o.b.t.chunks.LineProcessor - Line Processor initialized.
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 64 for line [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Gary Potter,02/22/1953,64]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 49 for line [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Betty Wise,02/17/1968,49]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25]
[main] DEBUG o.b.t.chunks.LineProcessor - Line Processor ended.
[main] DEBUG o.b.t.chunks.LinesWriter - Line Writer ended.
[main] DEBUG o.b.t.chunks.LineReader - Line Reader ended.

✅ 结果相同但执行流程不同,日志清晰展示了分块处理过程。

6. 结论

不同场景需选择不同方案:

  • Tasklets 更适合"任务串联"场景(如文件清理后执行导入)
  • Chunks 在以下场景更简单粗暴:
    • 分页读取
    • 避免内存中保存大量数据
    • 需要事务控制的数据处理

完整实现参考 GitHub 项目


原始标题:Spring Batch - Tasklets vs Chunks