1. 引言

Spring Batch 提供了强大的作业重启机制,允许作业从失败点恢复执行。这对高效处理大规模数据任务至关重要。

Spring Batch 内置的 JobRepository 会持久化作业执行状态,使作业默认具备重启能力。因此,失败作业能精确从中断处恢复,避免重复处理或数据丢失。

本文将探讨如何有效配置和重启失败的 Spring Batch 作业。

2. Maven 依赖

首先在 pom.xml 中添加以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
    <version>3.3.2</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
    <version>3.3.2</version>
</dependency>
      
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <version>2.2.224</version>
</dependency>

必须使用文件型 H2 数据库,通过持久化作业状态实现跨应用重启

3. 定义 Spring Batch 作业

本节创建一个简单的 CSV 处理作业。在 Spring Boot 3 中,**避免使用 @EnableBatchProcessing**,它会禁用自动配置(如自动创建 Batch 表)。

3.1. 配置

创建 BatchConfig 类定义作业:

@Configuration
public class BatchConfig {

    @Bean
    public Job simpleJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new JobBuilder("simpleJob", jobRepository)
          .start(step1(jobRepository, transactionManager))
          .build();
    }
}

simpleJob 使用 JobBuilder 构建,包含一个步骤 step1

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder("step1", jobRepository)
      .<String, String>chunk(2, transactionManager)
      .reader(flatFileItemReader())
      .processor(itemProcessor())
      .writer(itemWriter())
      .build();
}

该步骤以 2 条记录为批次处理数据,通过 FlatFileItemReader 读取、ItemProcessor 转换、ItemWriter 写出。**JobRepository 会持久化步骤状态(包括读取位置),确保失败后能从最后未提交的批次恢复**。

3.2. Item Reader

定义 flatFileItemReader 提供输入数据:

@Bean
@StepScope
public FlatFileItemReader<String> flatFileItemReader() {
    return new FlatFileItemReaderBuilder<String>()
      .name("itemReader")
      .resource(new ClassPathResource("data.csv"))
      .lineMapper(new PassThroughLineMapper())
      .saveState(true)
      .build();
}

关键点:

  • @StepScope 确保每次步骤执行创建新实例
  • 读取类路径下的 data.csv 文件
  • **saveState(true) 持久化读取位置到 ExecutionContext**,失败后可从断点恢复

⚠️ 要实现真正的重启能力,ItemReader 必须持久化状态。FlatFileItemReader 等内置读取器会自动保存行号等关键信息。

3.3. Item Processor

定义 itemProcessor 转换数据:

@Bean
public RestartItemProcessor itemProcessor() {
    return new RestartItemProcessor();
}

static class RestartItemProcessor implements ItemProcessor<String, String> {
    private boolean failOnItem3 = true;

    public void setFailOnItem3(boolean failOnItem3) {
        this.failOnItem3 = failOnItem3;
    }

    @Override
    public String process(String item) throws Exception {
        System.out.println("Processing: " + item + " (failOnItem3=" + failOnItem3 + ")");
        if (failOnItem3 && item.equals("Item3")) {
            throw new RuntimeException("Simulated failure on Item3");
        }
        return "PROCESSED " + item;
    }
}

该处理器在遇到 Item3 时会模拟失败(通过 failOnItem3 标志控制)。

3.4. Item Writer

创建 itemWriter 输出处理结果:

@Bean
public ItemWriter<String> itemWriter() {
    return items -> {
      System.out.println("Writing items:");
      for (String item : items) {
          System.out.println("- " + item);
      }
    };
}

简单打印处理结果到控制台。

4. 重启失败的 Spring Batch 作业

Spring Batch 作业默认可重启,无需额外配置。但需满足:

  1. 作业状态持久化到 JobRepository
  2. JobRepository 使用数据库存储(本文用 H2)

4.1. 模拟作业失败

通过 ItemProcessor 在处理 Item3 时抛出异常。执行 mvn spring-boot:run 输出:

Starting new job execution...
Processing: Item1
Processing: Item2
Writing items:
- PROCESSED Item1
- PROCESSED Item2
Processing: Item3
[Exception: Simulated failure on Item3]
Job started with status: FAILED

Item1Item2 处理成功,Item3 失败,状态已持久化。

4.2. 重启作业

使用 CommandLineRunner 检测失败作业并重启:

@Bean
CommandLineRunner run(JobLauncher jobLauncher, Job job, JobExplorer jobExplorer,
    JobOperator jobOperator, BatchConfig.RestartItemProcessor itemProcessor) {
    return args -> {
      JobParameters jobParameters = new JobParametersBuilder()
        .addString("jobId", "test-job-" + System.currentTimeMillis())
        .toJobParameters();

      List<JobInstance> instances = jobExplorer.getJobInstances("simpleJob", 0, 1);
      if (!instances.isEmpty()) {
          JobInstance lastInstance = instances.get(0);
          List<JobExecution> executions = jobExplorer.getJobExecutions(lastInstance);
          if (!executions.isEmpty()) {
              JobExecution lastExecution = executions.get(0);
              if (lastExecution.getStatus() == BatchStatus.FAILED) {
                  itemProcessor.setFailOnItem3(false); // 关闭失败模拟

                  // 方式1:使用 JobLauncher
                  JobExecution restartedExecution = jobLauncher.run(job, jobParameters);

                  // 方式2:使用 JobOperator
                  // final Long restartId = jobOperator.restart(lastExecution.getId());
                  // final JobExecution restartedExecution = jobExplorer.getJobExecution(restartId);
              }
          }
      }
    };
}

关键步骤:

  1. 通过 JobExplorer 检测失败作业
  2. 关闭失败模拟(setFailOnItem3(false)
  3. 使用 JobLauncher.run()JobOperator.restart() 重启

再次运行应用输出:

Restarting failed job execution with ID: [execution_id]
Processing: Item3
Processing: Item4
Writing items:
- PROCESSED Item3
- PROCESSED Item4
Processing: Item5
Writing items:
- PROCESSED Item5
Restarted job status: COMPLETED

✅ 作业从 Item3 恢复,处理到 Item5 完成。

4.3. 测试作业重启

使用单元测试验证重启功能:

@Test
public void givenItems_whenFailed_thenRestartFromFailure() throws Exception {
    // Given
    createTestFile("Item1\nItem2\nItem3\nItem4");

    JobParameters jobParameters = new JobParametersBuilder()
      .addLong("time", System.currentTimeMillis())
      .toJobParameters();

    // When: 首次执行(预期失败)
    JobExecution firstExecution = jobLauncherTestUtils.launchJob(jobParameters);
    assertEquals(BatchStatus.FAILED, firstExecution.getStatus());

    // 修改处理器行为
    itemProcessor.setFailOnItem3(false);

    // Then: 重启执行
    JobExecution restartedExecution = jobLauncherTestUtils.launchJob(jobParameters);
    assertEquals(BatchStatus.COMPLETED, restartedExecution.getStatus());

    // 验证是同一作业实例
    assertEquals(
      firstExecution.getJobInstance().getInstanceId(),
      restartedExecution.getJobInstance().getInstanceId()
    );
}

测试验证三点:

  1. 初始失败符合预期
  2. 重启后从断点继续
  3. 两次执行属于同一作业实例

4.4. 禁止作业重启

使用 preventRestart() 可禁用重启功能

@Bean
public Job simpleJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new JobBuilder("simpleJob", jobRepository)
      .start(step1(jobRepository, transactionManager))
      .preventRestart() // 禁止重启
      .build();
}

添加 .preventRestart() 后,重启时作业将从头开始(如 Item1),而不是从失败点(如 Item3)恢复。

5. 结论

Spring Batch 的默认重启机制能可靠恢复失败作业,确保:

  • 从断点继续执行
  • 避免重复处理已完成数据
  • 防止数据丢失

本文通过示例演示了:

  1. 创建可重启的分块处理作业
  2. 模拟 Item3 处理失败
  3. 重启后从 Item3 继续执行至完成
  4. 使用 preventRestart() 覆盖默认行为

完整源码可在 GitHub 获取。


原始标题:Restart a Job on Failure and Continue in Spring Batch | Baeldung