1. 引言
Spring Batch 提供两种实现作业的方式:Tasklets 和 Chunks。本文将通过一个实际案例,演示如何配置和实现这两种方法。
2. 依赖配置
首先添加必要依赖:
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>5.1.1</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<version>5.0.0</version>
<scope>test</scope>
</dependency>
最新版本请参考 Maven Central 和 spring-batch-test。
3. 场景说明
考虑以下 CSV 文件内容:
Mae Hodges,10/22/1972
Gary Potter,02/22/1953
Betty Wise,02/17/1968
Wayne Rose,04/06/1977
Adam Caldwell,09/27/1995
Lucille Phillips,05/14/1992
每行第一列是姓名,第二列是出生日期。我们的目标是生成新 CSV 文件,包含姓名和年龄:
Mae Hodges,45
Gary Potter,64
Betty Wise,49
Wayne Rose,40
Adam Caldwell,22
Lucille Phillips,25
现在开始用两种方式实现方案,先从 Tasklets 开始。
4. Tasklets 方案
4.1 设计思路
Tasklets 适用于在单个步骤中执行单一任务。作业由多个顺序执行的步骤组成,每个步骤只执行一个明确定义的任务。
我们的作业包含三个步骤:
- 从输入 CSV 文件读取行
- 计算每人的年龄
- 将姓名和年龄写入新 CSV 文件
创建三个类分别实现:
LinesReader
负责读取文件:public class LinesReader implements Tasklet { // ... }
LinesProcessor
计算年龄:public class LinesProcessor implements Tasklet { // ... }
LinesWriter
写入结果:public class LinesWriter implements Tasklet { // ... }
所有步骤都需实现 Tasklet
接口,强制实现 execute
方法:
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
// ...
}
4.2 配置
在 Spring 应用上下文中添加配置:
@Configuration
public class TaskletsConfig {
@Bean
protected Step readLines(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("readLines", jobRepository)
.tasklet(linesReader(), transactionManager)
.build();
}
@Bean
protected Step processLines(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("processLines", jobRepository)
.tasklet(linesProcessor(), transactionManager)
.build();
}
@Bean
protected Step writeLines(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("writeLines", jobRepository)
.tasklet(linesWriter(), transactionManager)
.build();
}
@Bean
public Job job(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("taskletsJob", jobRepository)
.start(readLines(jobRepository, transactionManager))
.next(processLines(jobRepository, transactionManager))
.next(writeLines(jobRepository, transactionManager))
.build();
}
// ...
}
作业流程定义完成,现在添加业务逻辑。
4.3 模型与工具类
创建 Line
类处理 CSV 行:
public class Line implements Serializable {
private String name;
private LocalDate dob;
private Long age;
// 标准构造器、getter/setter 和 toString 实现
}
⚠️ 注意:Line
实现了 Serializable
,因为 Spring Batch 要求步骤间传递的对象必须可序列化。
使用 OpenCSV 处理文件:
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>5.8</version>
</dependency>
创建 FileUtils
工具类:
public class FileUtils {
public Line readLine() throws Exception {
if (CSVReader == null)
initReader();
String[] line = CSVReader.readNext();
if (line == null)
return null;
return new Line(
line[0],
LocalDate.parse(
line[1],
DateTimeFormatter.ofPattern("MM/dd/yyyy")));
}
public void writeLine(Line line) throws Exception {
if (CSVWriter == null)
initWriter();
String[] lineStr = new String[2];
lineStr[0] = line.getName();
lineStr[1] = line
.getAge()
.toString();
CSVWriter.writeNext(lineStr);
}
// ...
}
4.4 LinesReader 实现
public class LinesReader implements Tasklet, StepExecutionListener {
private final Logger logger = LoggerFactory
.getLogger(LinesReader.class);
private List<Line> lines;
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
lines = new ArrayList<>();
fu = new FileUtils(
"taskletsvschunks/input/tasklets-vs-chunks.csv");
logger.debug("Lines Reader initialized.");
}
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
Line line = fu.readLine();
while (line != null) {
lines.add(line);
logger.debug("Read line: " + line.toString());
line = fu.readLine();
}
return RepeatStatus.FINISHED;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeReader();
stepExecution
.getJobExecution()
.getExecutionContext()
.put("lines", this.lines);
logger.debug("Lines Reader ended.");
return ExitStatus.COMPLETED;
}
}
关键点:
execute
方法循环读取所有行到内存列表afterStep
将结果存入作业上下文供后续步骤使用:stepExecution .getJobExecution() .getExecutionContext() .put("lines", this.lines);
4.5 LinesProcessor 实现
public class LinesProcessor implements Tasklet, StepExecutionListener {
private Logger logger = LoggerFactory.getLogger(
LinesProcessor.class);
private List<Line> lines;
@Override
public void beforeStep(StepExecution stepExecution) {
ExecutionContext executionContext = stepExecution
.getJobExecution()
.getExecutionContext();
this.lines = (List<Line>) executionContext.get("lines");
logger.debug("Lines Processor initialized.");
}
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
for (Line line : lines) {
long age = ChronoUnit.YEARS.between(
line.getDob(),
LocalDate.now());
logger.debug("Calculated age " + age + " for line " + line.toString());
line.setAge(age);
}
return RepeatStatus.FINISHED;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
logger.debug("Lines Processor ended.");
return ExitStatus.COMPLETED;
}
}
✅ 直接修改从上下文获取的列表,无需重新存储。
4.6 LinesWriter 实现
public class LinesWriter implements Tasklet, StepExecutionListener {
private final Logger logger = LoggerFactory
.getLogger(LinesWriter.class);
private List<Line> lines;
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
ExecutionContext executionContext = stepExecution
.getJobExecution()
.getExecutionContext();
this.lines = (List<Line>) executionContext.get("lines");
fu = new FileUtils("output.csv");
logger.debug("Lines Writer initialized.");
}
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
for (Line line : lines) {
fu.writeLine(line);
logger.debug("Wrote line " + line.toString());
}
return RepeatStatus.FINISHED;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeWriter();
logger.debug("Lines Writer ended.");
return ExitStatus.COMPLETED;
}
}
4.7 运行作业
创建测试类:
@SpringBatchTest
@EnableAutoConfiguration
@ContextConfiguration(classes = TaskletsConfig.class)
public class TaskletsIntegrationTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Test
public void givenTaskletsJob_whenJobEnds_thenStatusCompleted()
throws Exception {
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
}
}
补充必要 Bean:
@Bean
public JobLauncherTestUtils jobLauncherTestUtils() {
return new JobLauncherTestUtils();
}
@Bean
public JobRepository jobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource());
factory.setTransactionManager(transactionManager());
return factory.getObject();
}
@Bean
public DataSource dataSource() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("org.sqlite.JDBC");
dataSource.setUrl("jdbc:sqlite:repository.sqlite");
return dataSource;
}
@Bean
public PlatformTransactionManager transactionManager() {
return new ResourcelessTransactionManager();
}
@Bean
public JobLauncher jobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository());
return jobLauncher;
}
运行后生成 output.csv
,日志显示执行流程:
[main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader initialized.
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader ended.
[main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor initialized.
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 64 for line [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 49 for line [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor ended.
[main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer initialized.
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Gary Potter,02/22/1953,64]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Betty Wise,02/17/1968,49]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25]
[main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer ended.
5. Chunks 方案
5.1 设计思路
顾名思义,此方案按数据块处理。不同于一次性处理所有行,而是分批次(chunk)读取、处理和写入固定数量的记录。
执行流程变为:
- 循环直到文件结束:
- 对 X 条记录:
- 读取一行
- 处理一行
- 写入 X 条记录
- 对 X 条记录:
创建三个组件:
public class LineReader {
// ...
}
public class LineProcessor {
// ...
}
public class LinesWriter {
// ...
}
5.2 配置
作业配置差异明显:
@Configuration
public class ChunksConfig {
@Bean
public ItemReader<Line> itemReader() {
return new LineReader();
}
@Bean
public ItemProcessor<Line, Line> itemProcessor() {
return new LineProcessor();
}
@Bean
public ItemWriter<Line> itemWriter() {
return new LinesWriter();
}
@Bean
protected Step processLines(JobRepository jobRepository, PlatformTransactionManager transactionManager, ItemReader<Line> reader,
ItemProcessor<Line, Line> processor, ItemWriter<Line> writer) {
return new StepBuilder("processLines", jobRepository).<Line, Line> chunk(2, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public Job job(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("chunksJob", jobRepository)
.start(processLines(jobRepository, transactionManager, itemReader(), itemProcessor(), itemWriter()))
.build();
}
}
关键点:
- 单步骤包含 reader、processor 和 writer
chunk(2)
指定每个数据块处理 2 条记录
5.3 LineReader 实现
实现 ItemReader
接口:
public class LineReader implements
ItemReader<Line>, StepExecutionListener {
private final Logger logger = LoggerFactory
.getLogger(LineReader.class);
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
fu = new FileUtils("taskletsvschunks/input/tasklets-vs-chunks.csv");
logger.debug("Line Reader initialized.");
}
@Override
public Line read() throws Exception {
Line line = fu.readLine();
if (line != null) logger.debug("Read line: " + line.toString());
return line;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeReader();
logger.debug("Line Reader ended.");
return ExitStatus.COMPLETED;
}
}
❌ 注意:beforeStep
和 afterStep
在整个步骤开始/结束时执行,而非每条记录。
5.4 LineProcessor 实现
实现 ItemProcessor
接口:
public class LineProcessor implements
ItemProcessor<Line, Line>, StepExecutionListener {
private Logger logger = LoggerFactory.getLogger(LineProcessor.class);
@Override
public void beforeStep(StepExecution stepExecution) {
logger.debug("Line Processor initialized.");
}
@Override
public Line process(Line line) throws Exception {
long age = ChronoUnit.YEARS
.between(line.getDob(), LocalDate.now());
logger.debug(
"Calculated age " + age + " for line " + line.toString());
line.setAge(age);
return line;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
logger.debug("Line Processor ended.");
return ExitStatus.COMPLETED;
}
}
process()
方法接收输入行,处理后返回输出行。
5.5 LinesWriter 实现
实现 ItemWriter
接口,一次写入整个数据块:
public class LinesWriter implements
ItemWriter<Line>, StepExecutionListener {
private final Logger logger = LoggerFactory
.getLogger(LinesWriter.class);
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
fu = new FileUtils("output.csv");
logger.debug("Line Writer initialized.");
}
@Override
public void write(List<? extends Line> lines) throws Exception {
for (Line line : lines) {
fu.writeLine(line);
logger.debug("Wrote line " + line.toString());
}
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeWriter();
logger.debug("Line Writer ended.");
return ExitStatus.COMPLETED;
}
}
5.6 运行作业
创建测试类:
@SpringBatchTest
@EnableAutoConfiguration
@ContextConfiguration(classes = ChunksConfig.class)
public class ChunksIntegrationTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Test
public void givenChunksJob_whenJobEnds_thenStatusCompleted()
throws Exception {
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
}
}
运行后生成相同 output.csv
,日志显示分块处理流程:
[main] DEBUG o.b.t.chunks.LineReader - Line Reader initialized.
[main] DEBUG o.b.t.chunks.LinesWriter - Line Writer initialized.
[main] DEBUG o.b.t.chunks.LineProcessor - Line Processor initialized.
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 64 for line [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Gary Potter,02/22/1953,64]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 49 for line [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Betty Wise,02/17/1968,49]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25]
[main] DEBUG o.b.t.chunks.LineProcessor - Line Processor ended.
[main] DEBUG o.b.t.chunks.LinesWriter - Line Writer ended.
[main] DEBUG o.b.t.chunks.LineReader - Line Reader ended.
✅ 结果相同但执行流程不同,日志清晰展示了分块处理过程。
6. 结论
不同场景需选择不同方案:
- Tasklets 更适合"任务串联"场景(如文件清理后执行导入)
- Chunks 在以下场景更简单粗暴:
- 分页读取
- 避免内存中保存大量数据
- 需要事务控制的数据处理
完整实现参考 GitHub 项目。