2. Spring Batch 任务基础

Spring Batch 是一个强大的批处理框架,通过提供可复用组件和可靠基础设施,让海量数据处理变得轻而易举。实际开发中,我们经常需要按特定顺序同时执行多个任务,以优化性能并有效管理依赖关系。

Spring Batch 中的任务(Job)是步骤(Step)的容器,代表整个处理流程。每个任务都有唯一标识符,可包含多个按顺序或条件执行的步骤。我们可以通过 XML 或 Java 配置任务,通常使用 JobLauncher 启动它们。

多任务执行适用于以下场景:

  • ✅ 并行处理
  • ✅ 数据迁移和 ETL 流程
  • ✅ 报表生成等

高效管理多任务对实现最佳性能、可维护性和可扩展性至关重要。接下来我们探讨 Spring Batch 中实现多任务执行的不同方案。

3. 基础配置

首先配置依赖项:

<dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-batch</artifactId>
    <version>3.3.2</version> 
</dependency> 
<dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-web</artifactId>
    <version>3.3.2</version>
</dependency> 
<dependency> 
    <groupId>com.h2database</groupId> 
    <artifactId>h2</artifactId>
    <scope>runtime</scope>
    <version>2.2.224</version> 
</dependency>

我们添加了:

  • spring-boot-starter-web:基础 Web 依赖
  • spring-boot-starter-batch:批处理核心依赖
  • h2:内存数据库

接下来启用批处理并配置数据源:

@Configuration
@EnableBatchProcessing
public class BatchConfig {
    @Bean
    public DataSource dataSource() {
        return DataSourceBuilder.create()
          .driverClassName("org.h2.Driver")
          .url("jdbc:h2:mem:batchdb;DB_CLOSE_DELAY=-1;")
          .username("sa")
          .password("")
          .build();
    }

    @Bean
    public DatabasePopulator databasePopulator(DataSource dataSource) {
        ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
        populator.addScript(new ClassPathResource("org/springframework/batch/core/schema-h2.sql"));
        populator.setContinueOnError(false);
        populator.execute(dataSource);
        return populator;
    }
}

现在创建两个示例任务,每个任务执行简单操作:

@Configuration
public class JobsConfig {
    private static final Logger log = LoggerFactory.getLogger(SequentialJobsConfig.class);

    @Bean
    public Job jobOne(JobRepository jobRepository, Step stepOne) {
        return new JobBuilder("jobOne", jobRepository).start(stepOne)
          .build();
    }

    @Bean
    public Step stepOne(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("stepOne", jobRepository).tasklet((contribution, chunkContext) -> {
              log.info("Hello");
              return RepeatStatus.FINISHED;
          }, transactionManager)
          .build();
    }

    @Bean
    public Job jobTwo(JobRepository jobRepository, Step stepTwo) {
        return new JobBuilder("jobTwo", jobRepository).start(stepTwo)
          .build();
    }

    @Bean
    public Step stepTwo(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("stepTwo", jobRepository).tasklet((contribution, chunkContext) -> {
              log.info("World");
              return RepeatStatus.FINISHED;
          }, transactionManager)
          .build();
    }
}

@EnableBatchProcessing 注解会自动配置核心组件如 JobLauncherJobRepositoryJobExplorer

我们定义了两个独立任务 jobOnejobTwo,每个任务包含自己的步骤。这些步骤是简单的带事务支持的 tasklet,通过日志确认执行状态。

验证任务定义:

@Autowired
private Job jobOne;

@Autowired
private Job jobTwo;

@Test
void givenJobsDefinitions_whenJobsLoaded_thenJobNamesShouldMatch() {
    assertNotNull(jobOne, "jobOne should be defined");
    assertEquals("jobOne", jobOne.getName());

    assertNotNull(jobTwo, "jobTwo should be defined");
    assertEquals("jobTwo", jobTwo.getName());
}

4. 顺序执行任务

当任务需要按顺序执行(尤其是存在依赖关系时),顺序执行是理想选择。看这个示例:

@Component
public class SequentialJobsConfig {
    @Autowired
    private Job jobOne;

    @Autowired
    private Job jobTwo;

    @Autowired
    private JobLauncher jobLauncher;

    public void runJobsSequentially() {
        JobParameters jobParameters = new JobParametersBuilder().addString("ID", "Sequential 1")
          .toJobParameters();

        JobParameters jobParameters2 = new JobParametersBuilder().addString("ID", "Sequential 2")
          .toJobParameters();

        // 顺序执行任务
        try {
            jobLauncher.run(jobOne, jobParameters);
            jobLauncher.run(jobTwo, jobParameters2);
        } catch (Exception e) {
            // 异常处理
            e.printStackTrace();
        }
    }
}

我们创建 SequentialJobsConfig 组件注入两个任务,通过 JobLauncher 运行。使用 addString() 添加唯一 ID 参数确保任务实例唯一。这种方式能控制执行流程,并在执行下一任务前检查前序任务结果。

验证顺序执行:

@Autowired
private SequentialJobsConfig sequentialJobsConfig;

@Test
void givenSequentialJobs_whenExecuted_thenRunJobsInOrder() {
    assertDoesNotThrow(() -> sequentialJobsConfig.runJobsSequentially(), "Sequential job execution should execute");
}

5. 并行执行任务

当任务间无依赖关系时,并行执行可显著提升性能。利用 Spring 的 TaskExecutor 接口实现:

@Component
public class ParallelJobService {
    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job jobOne;

    @Autowired
    private Job jobTwo;

    public void runJobsInParallel() {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();

        taskExecutor.execute(() -> {
            try {
                jobLauncher.run(jobOne, new JobParametersBuilder().addString("ID", "Parallel 1")
                  .toJobParameters());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        taskExecutor.execute(() -> {
            try {
                jobLauncher.run(jobTwo, new JobParametersBuilder().addString("ID", "Parallel 2")
                  .toJobParameters());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        taskExecutor.close();
    }
}

这里使用 SimpleAsyncTaskExecutor 通过 JobLauncher 启动任务。⚠️ 使用并行方案时需考虑:

  • 线程安全
  • 资源竞争
  • 事务管理

确保稳定高效执行。

6. 任务调度执行

有时我们需要在特定时间或周期执行任务,这时就需要任务调度。可通过 Spring 调度支持或外部调度器实现。

6.1 使用 Spring @Scheduled

@Scheduled 注解允许方法(任务)按设定周期重复执行。需配合 @EnableScheduling 注解启用调度。

创建调度任务类:

@Configuration
@EnableScheduling
public class ScheduledJobs {
    private static final Logger log = LoggerFactory.getLogger(SequentialJobsConfig.class);

    @Autowired
    private Job jobOne;

    @Autowired
    private Job jobTwo;

    @Autowired
    private JobLauncher jobLauncher;

    @Scheduled(cron = "0 */1 * * * *")  // 每分钟执行
    public void runJob1() throws Exception {
        JobParameters jobParameters = new JobParametersBuilder()
          .addString("jobID", String.valueOf(System.currentTimeMillis()))
          .toJobParameters();

        log.info("Executing scheduled job 1");
        jobLauncher.run(jobOne, jobParameters);
    }

    @Scheduled(fixedRate = 1000 * 60 * 3)  // 每3分钟执行
    public void runJob2() throws Exception {
        JobParameters jobParameters = new JobParametersBuilder()
          .addString("jobID", String.valueOf(System.currentTimeMillis()))
          .toJobParameters();

        log.info("Executing scheduled job 2");
        jobLauncher.run(jobTwo, jobParameters);
    }
}

配置 jobOne 每分钟执行,jobTwo 每3分钟执行。@Scheduled 支持固定频率或 cron 表达式定义简单到复杂的调度模式。

6.2 使用 Quartz 调度器

Quartz 是 Java 应用中强大的任务调度库。与 @Scheduled 类似,它支持多任务周期执行。先添加依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
    <version>3.3.2</version>
</dependency>

创建两个任务类:

@Component
public class QuartzJobOne implements Job {
    private static final Logger log = LoggerFactory.getLogger(QuartzJobOne.class);
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        try {
            log.info("Job One is executing from quartz");
        } catch (Exception e) {
            log.error("Error executing Job One: {}", e.getMessage(), e);
            throw new JobExecutionException(e);
        }
    }
}
@Component
public class QuartzJobTwo implements Job {
    private static final Logger log = LoggerFactory.getLogger(QuartzJobOne.class);

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        try {
            log.info("Job Two is executing from quartz");
        } catch (Exception e) {
            log.error("Error executing Job Two: {}", e.getMessage(), e);
            throw new JobExecutionException(e);
        }
    }
}

为每个任务定义 JobDetailTrigger

@Configuration
public class QuartzConfig {
    @Autowired
    private Job quartzJobOne;

    @Autowired
    private Job quartzJobTwo;

    @Bean
    public JobDetail job1Detail() {
        return JobBuilder.newJob().ofType(quartzJobOne.getClass())
          .withIdentity("quartzJobOne", "group1")
          .storeDurably()
          .build();
    }

    @Bean
    public JobDetail job2Detail() {
        return JobBuilder.newJob().ofType(quartzJobTwo.getClass())
          .withIdentity("quartzJobTwo", "group1")
          .storeDurably()
          .build();
    }

    @Bean
    public Trigger job1Trigger(JobDetail job1Detail) {
        return TriggerBuilder.newTrigger()
          .forJob(job1Detail)
          .withIdentity("quartzJobOneTrigger", "group1")
          .withSchedule(CronScheduleBuilder.cronSchedule("0/10 * * * * ?"))
          .build();
    }

    @Bean
    public Trigger job2Trigger(JobDetail job2Detail) {
        return TriggerBuilder.newTrigger()
          .forJob(job2Detail)
          .withIdentity("quartzJobTwoTrigger", "group1")
          .withSchedule(CronScheduleBuilder.cronSchedule("0/15 * * * * ?"))
          .build();
    }

    @Bean
    public SchedulerFactoryBean schedulerFactoryBean() {
        SchedulerFactoryBean schedulerFactory = new SchedulerFactoryBean();
        schedulerFactory.setJobDetails(job1Detail(), job2Detail());
        schedulerFactory.setTriggers(job1Trigger(job1Detail()), job2Trigger(job2Detail()));
        return schedulerFactory;
    }
}

使用 JobBuilder 创建 JobDetail,通过 TriggerBuilder 和 cron 表达式定义触发时间(分别每10秒和15秒执行)。在 schedulerFactoryBean 中自动启动任务。

Quartz 灵活性极高,支持复杂调度场景,但配置比 @Scheduled 更复杂。

7. 动态任务执行

前述方案都需要静态预定义任务。但有时我们需要根据运行时条件动态创建任务。Spring Batch 的 chunk 或 tasklet 方式均可实现,这里采用 chunk 方式。

chunk 模式中,数据通过 ItemReader 读取,经 ItemProcessor 处理,最终由 ItemWriter 写入。

创建动态任务服务:

@Service
public class DynamicJobService {
    private final JobRepository jobRepository;
    private final JobLauncher jobLauncher;
    private final PlatformTransactionManager transactionManager;

    public DynamicJobService(JobRepository jobRepository, JobLauncher jobLauncher, PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.jobLauncher = jobLauncher;
        this.transactionManager = transactionManager;
    }

    public void createAndRunJob(Map<String, List<String>> jobsData) throws Exception {
        List<Job> jobs = new ArrayList<>();

        // 创建 chunk 任务
        for (Map.Entry<String, List<String>> entry : jobsData.entrySet()) {
            if (entry.getValue() instanceof List) {
                jobs.add(createJob(entry.getKey(), entry.getValue()));
            }
        }

        // 执行所有任务
        for (Job job : jobs) {
            JobParameters jobParameters = new JobParametersBuilder().addString("jobID", String.valueOf(System.currentTimeMillis()))
              .toJobParameters();
            jobLauncher.run(job, jobParameters);
        }
    }

    private Job createJob(String jobName, List<String> data) {
        return new JobBuilder(jobName, jobRepository).start(createStep(data))
          .build();
    }

    private Step createStep(List<String> data) {
        return new StepBuilder("step", jobRepository).<String, String> chunk(10, transactionManager)
          .reader(new ListItemReader<>(data))
          .processor(item -> item.toUpperCase())
          .writer(items -> items.forEach(System.out::println))
          .build();
    }
}

createAndRunJob 方法根据输入数据动态生成任务并执行。执行流程:

  1. reader() 从输入列表逐项读取
  2. processor() 将项目转为大写
  3. 处理后的项目收集为 chunk(大小10)
  4. writer() 将 chunk 内容打印到控制台
  5. 重复直到所有数据处理完毕

测试动态任务:

@Autowired
private DynamicJobService dynamicJobService;

@Test
void givenJobData_whenJobsCreated_thenJobsRunSeccessfully() throws Exception {
    Map<String, List<String>> jobsData = new HashMap<>();
    jobsData.put("chunkJob1", Arrays.asList("data1", "data2", "data3"));
    jobsData.put("chunkJob2", Arrays.asList("data4", "data5", "data6"));
    assertDoesNotThrow(() -> dynamicJobService.createAndRunJob(jobsData), "Dynamic job creation and execution should run successfully");
}

实际项目中通常需要更复杂的处理逻辑。当内置实现不满足需求时,建议自定义 ItemReaderItemProcessorItemWriter

8. 总结

本文探讨了 Spring Batch 中执行多任务的多种方案。掌握这些基础示例后,我们可以设计出更高效、可扩展、易维护的批处理系统。

选择哪种方案取决于具体需求:

  • ✅ 顺序执行:有依赖关系的任务链
  • ✅ 并行执行:无依赖关系的独立任务
  • ✅ 调度执行:周期性或定时任务
  • ✅ 动态执行:运行时条件触发的任务

完整实现代码请参考 GitHub 仓库


原始标题:How to Run Multiple Jobs in Spring Batch | Baeldung