1. 引言
Spring Batch 提供了强大的作业重启机制,允许作业从失败点恢复执行。这对高效处理大规模数据任务至关重要。
Spring Batch 内置的 JobRepository
会持久化作业执行状态,使作业默认具备重启能力。因此,失败作业能精确从中断处恢复,避免重复处理或数据丢失。
本文将探讨如何有效配置和重启失败的 Spring Batch 作业。
2. Maven 依赖
首先在 pom.xml
中添加以下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.2.224</version>
</dependency>
必须使用文件型 H2 数据库,通过持久化作业状态实现跨应用重启。
3. 定义 Spring Batch 作业
本节创建一个简单的 CSV 处理作业。在 Spring Boot 3 中,**避免使用 @EnableBatchProcessing
**,它会禁用自动配置(如自动创建 Batch 表)。
3.1. 配置
创建 BatchConfig
类定义作业:
@Configuration
public class BatchConfig {
@Bean
public Job simpleJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("simpleJob", jobRepository)
.start(step1(jobRepository, transactionManager))
.build();
}
}
simpleJob
使用 JobBuilder
构建,包含一个步骤 step1
:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(flatFileItemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
该步骤以 2 条记录为批次处理数据,通过 FlatFileItemReader
读取、ItemProcessor
转换、ItemWriter
写出。**JobRepository
会持久化步骤状态(包括读取位置),确保失败后能从最后未提交的批次恢复**。
3.2. Item Reader
定义 flatFileItemReader
提供输入数据:
@Bean
@StepScope
public FlatFileItemReader<String> flatFileItemReader() {
return new FlatFileItemReaderBuilder<String>()
.name("itemReader")
.resource(new ClassPathResource("data.csv"))
.lineMapper(new PassThroughLineMapper())
.saveState(true)
.build();
}
关键点:
@StepScope
确保每次步骤执行创建新实例- 读取类路径下的
data.csv
文件 - **
saveState(true)
持久化读取位置到ExecutionContext
**,失败后可从断点恢复
⚠️ 要实现真正的重启能力,ItemReader
必须持久化状态。FlatFileItemReader
等内置读取器会自动保存行号等关键信息。
3.3. Item Processor
定义 itemProcessor
转换数据:
@Bean
public RestartItemProcessor itemProcessor() {
return new RestartItemProcessor();
}
static class RestartItemProcessor implements ItemProcessor<String, String> {
private boolean failOnItem3 = true;
public void setFailOnItem3(boolean failOnItem3) {
this.failOnItem3 = failOnItem3;
}
@Override
public String process(String item) throws Exception {
System.out.println("Processing: " + item + " (failOnItem3=" + failOnItem3 + ")");
if (failOnItem3 && item.equals("Item3")) {
throw new RuntimeException("Simulated failure on Item3");
}
return "PROCESSED " + item;
}
}
该处理器在遇到 Item3
时会模拟失败(通过 failOnItem3
标志控制)。
3.4. Item Writer
创建 itemWriter
输出处理结果:
@Bean
public ItemWriter<String> itemWriter() {
return items -> {
System.out.println("Writing items:");
for (String item : items) {
System.out.println("- " + item);
}
};
}
简单打印处理结果到控制台。
4. 重启失败的 Spring Batch 作业
Spring Batch 作业默认可重启,无需额外配置。但需满足:
- 作业状态持久化到
JobRepository
JobRepository
使用数据库存储(本文用 H2)
4.1. 模拟作业失败
通过 ItemProcessor
在处理 Item3
时抛出异常。执行 mvn spring-boot:run
输出:
Starting new job execution...
Processing: Item1
Processing: Item2
Writing items:
- PROCESSED Item1
- PROCESSED Item2
Processing: Item3
[Exception: Simulated failure on Item3]
Job started with status: FAILED
✅ Item1
和 Item2
处理成功,Item3
失败,状态已持久化。
4.2. 重启作业
使用 CommandLineRunner
检测失败作业并重启:
@Bean
CommandLineRunner run(JobLauncher jobLauncher, Job job, JobExplorer jobExplorer,
JobOperator jobOperator, BatchConfig.RestartItemProcessor itemProcessor) {
return args -> {
JobParameters jobParameters = new JobParametersBuilder()
.addString("jobId", "test-job-" + System.currentTimeMillis())
.toJobParameters();
List<JobInstance> instances = jobExplorer.getJobInstances("simpleJob", 0, 1);
if (!instances.isEmpty()) {
JobInstance lastInstance = instances.get(0);
List<JobExecution> executions = jobExplorer.getJobExecutions(lastInstance);
if (!executions.isEmpty()) {
JobExecution lastExecution = executions.get(0);
if (lastExecution.getStatus() == BatchStatus.FAILED) {
itemProcessor.setFailOnItem3(false); // 关闭失败模拟
// 方式1:使用 JobLauncher
JobExecution restartedExecution = jobLauncher.run(job, jobParameters);
// 方式2:使用 JobOperator
// final Long restartId = jobOperator.restart(lastExecution.getId());
// final JobExecution restartedExecution = jobExplorer.getJobExecution(restartId);
}
}
}
};
}
关键步骤:
- 通过
JobExplorer
检测失败作业 - 关闭失败模拟(
setFailOnItem3(false)
) - 使用
JobLauncher.run()
或JobOperator.restart()
重启
再次运行应用输出:
Restarting failed job execution with ID: [execution_id]
Processing: Item3
Processing: Item4
Writing items:
- PROCESSED Item3
- PROCESSED Item4
Processing: Item5
Writing items:
- PROCESSED Item5
Restarted job status: COMPLETED
✅ 作业从 Item3
恢复,处理到 Item5
完成。
4.3. 测试作业重启
使用单元测试验证重启功能:
@Test
public void givenItems_whenFailed_thenRestartFromFailure() throws Exception {
// Given
createTestFile("Item1\nItem2\nItem3\nItem4");
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
// When: 首次执行(预期失败)
JobExecution firstExecution = jobLauncherTestUtils.launchJob(jobParameters);
assertEquals(BatchStatus.FAILED, firstExecution.getStatus());
// 修改处理器行为
itemProcessor.setFailOnItem3(false);
// Then: 重启执行
JobExecution restartedExecution = jobLauncherTestUtils.launchJob(jobParameters);
assertEquals(BatchStatus.COMPLETED, restartedExecution.getStatus());
// 验证是同一作业实例
assertEquals(
firstExecution.getJobInstance().getInstanceId(),
restartedExecution.getJobInstance().getInstanceId()
);
}
测试验证三点:
- 初始失败符合预期
- 重启后从断点继续
- 两次执行属于同一作业实例
4.4. 禁止作业重启
使用 preventRestart()
可禁用重启功能:
@Bean
public Job simpleJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("simpleJob", jobRepository)
.start(step1(jobRepository, transactionManager))
.preventRestart() // 禁止重启
.build();
}
添加 .preventRestart()
后,重启时作业将从头开始(如 Item1
),而不是从失败点(如 Item3
)恢复。
5. 结论
Spring Batch 的默认重启机制能可靠恢复失败作业,确保:
- 从断点继续执行
- 避免重复处理已完成数据
- 防止数据丢失
本文通过示例演示了:
- 创建可重启的分块处理作业
- 模拟
Item3
处理失败 - 重启后从
Item3
继续执行至完成 - 使用
preventRestart()
覆盖默认行为
完整源码可在 GitHub 获取。