2. Spring Batch 任务基础
Spring Batch 是一个强大的批处理框架,通过提供可复用组件和可靠基础设施,让海量数据处理变得轻而易举。实际开发中,我们经常需要按特定顺序同时执行多个任务,以优化性能并有效管理依赖关系。
Spring Batch 中的任务(Job)是步骤(Step)的容器,代表整个处理流程。每个任务都有唯一标识符,可包含多个按顺序或条件执行的步骤。我们可以通过 XML 或 Java 配置任务,通常使用 JobLauncher
启动它们。
多任务执行适用于以下场景:
- ✅ 并行处理
- ✅ 数据迁移和 ETL 流程
- ✅ 报表生成等
高效管理多任务对实现最佳性能、可维护性和可扩展性至关重要。接下来我们探讨 Spring Batch 中实现多任务执行的不同方案。
3. 基础配置
首先配置依赖项:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
<version>2.2.224</version>
</dependency>
我们添加了:
spring-boot-starter-web
:基础 Web 依赖spring-boot-starter-batch
:批处理核心依赖h2
:内存数据库
接下来启用批处理并配置数据源:
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Bean
public DataSource dataSource() {
return DataSourceBuilder.create()
.driverClassName("org.h2.Driver")
.url("jdbc:h2:mem:batchdb;DB_CLOSE_DELAY=-1;")
.username("sa")
.password("")
.build();
}
@Bean
public DatabasePopulator databasePopulator(DataSource dataSource) {
ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
populator.addScript(new ClassPathResource("org/springframework/batch/core/schema-h2.sql"));
populator.setContinueOnError(false);
populator.execute(dataSource);
return populator;
}
}
现在创建两个示例任务,每个任务执行简单操作:
@Configuration
public class JobsConfig {
private static final Logger log = LoggerFactory.getLogger(SequentialJobsConfig.class);
@Bean
public Job jobOne(JobRepository jobRepository, Step stepOne) {
return new JobBuilder("jobOne", jobRepository).start(stepOne)
.build();
}
@Bean
public Step stepOne(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("stepOne", jobRepository).tasklet((contribution, chunkContext) -> {
log.info("Hello");
return RepeatStatus.FINISHED;
}, transactionManager)
.build();
}
@Bean
public Job jobTwo(JobRepository jobRepository, Step stepTwo) {
return new JobBuilder("jobTwo", jobRepository).start(stepTwo)
.build();
}
@Bean
public Step stepTwo(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("stepTwo", jobRepository).tasklet((contribution, chunkContext) -> {
log.info("World");
return RepeatStatus.FINISHED;
}, transactionManager)
.build();
}
}
@EnableBatchProcessing
注解会自动配置核心组件如 JobLauncher
、JobRepository
和 JobExplorer
。
我们定义了两个独立任务 jobOne
和 jobTwo
,每个任务包含自己的步骤。这些步骤是简单的带事务支持的 tasklet,通过日志确认执行状态。
验证任务定义:
@Autowired
private Job jobOne;
@Autowired
private Job jobTwo;
@Test
void givenJobsDefinitions_whenJobsLoaded_thenJobNamesShouldMatch() {
assertNotNull(jobOne, "jobOne should be defined");
assertEquals("jobOne", jobOne.getName());
assertNotNull(jobTwo, "jobTwo should be defined");
assertEquals("jobTwo", jobTwo.getName());
}
4. 顺序执行任务
当任务需要按顺序执行(尤其是存在依赖关系时),顺序执行是理想选择。看这个示例:
@Component
public class SequentialJobsConfig {
@Autowired
private Job jobOne;
@Autowired
private Job jobTwo;
@Autowired
private JobLauncher jobLauncher;
public void runJobsSequentially() {
JobParameters jobParameters = new JobParametersBuilder().addString("ID", "Sequential 1")
.toJobParameters();
JobParameters jobParameters2 = new JobParametersBuilder().addString("ID", "Sequential 2")
.toJobParameters();
// 顺序执行任务
try {
jobLauncher.run(jobOne, jobParameters);
jobLauncher.run(jobTwo, jobParameters2);
} catch (Exception e) {
// 异常处理
e.printStackTrace();
}
}
}
我们创建 SequentialJobsConfig
组件注入两个任务,通过 JobLauncher
运行。使用 addString()
添加唯一 ID
参数确保任务实例唯一。这种方式能控制执行流程,并在执行下一任务前检查前序任务结果。
验证顺序执行:
@Autowired
private SequentialJobsConfig sequentialJobsConfig;
@Test
void givenSequentialJobs_whenExecuted_thenRunJobsInOrder() {
assertDoesNotThrow(() -> sequentialJobsConfig.runJobsSequentially(), "Sequential job execution should execute");
}
5. 并行执行任务
当任务间无依赖关系时,并行执行可显著提升性能。利用 Spring 的 TaskExecutor
接口实现:
@Component
public class ParallelJobService {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job jobOne;
@Autowired
private Job jobTwo;
public void runJobsInParallel() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.execute(() -> {
try {
jobLauncher.run(jobOne, new JobParametersBuilder().addString("ID", "Parallel 1")
.toJobParameters());
} catch (Exception e) {
e.printStackTrace();
}
});
taskExecutor.execute(() -> {
try {
jobLauncher.run(jobTwo, new JobParametersBuilder().addString("ID", "Parallel 2")
.toJobParameters());
} catch (Exception e) {
e.printStackTrace();
}
});
taskExecutor.close();
}
}
这里使用 SimpleAsyncTaskExecutor
通过 JobLauncher
启动任务。⚠️ 使用并行方案时需考虑:
- 线程安全
- 资源竞争
- 事务管理
确保稳定高效执行。
6. 任务调度执行
有时我们需要在特定时间或周期执行任务,这时就需要任务调度。可通过 Spring 调度支持或外部调度器实现。
6.1 使用 Spring @Scheduled
@Scheduled
注解允许方法(任务)按设定周期重复执行。需配合 @EnableScheduling
注解启用调度。
创建调度任务类:
@Configuration
@EnableScheduling
public class ScheduledJobs {
private static final Logger log = LoggerFactory.getLogger(SequentialJobsConfig.class);
@Autowired
private Job jobOne;
@Autowired
private Job jobTwo;
@Autowired
private JobLauncher jobLauncher;
@Scheduled(cron = "0 */1 * * * *") // 每分钟执行
public void runJob1() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("jobID", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
log.info("Executing scheduled job 1");
jobLauncher.run(jobOne, jobParameters);
}
@Scheduled(fixedRate = 1000 * 60 * 3) // 每3分钟执行
public void runJob2() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("jobID", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
log.info("Executing scheduled job 2");
jobLauncher.run(jobTwo, jobParameters);
}
}
配置 jobOne
每分钟执行,jobTwo
每3分钟执行。@Scheduled
支持固定频率或 cron 表达式定义简单到复杂的调度模式。
6.2 使用 Quartz 调度器
Quartz 是 Java 应用中强大的任务调度库。与 @Scheduled
类似,它支持多任务周期执行。先添加依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
<version>3.3.2</version>
</dependency>
创建两个任务类:
@Component
public class QuartzJobOne implements Job {
private static final Logger log = LoggerFactory.getLogger(QuartzJobOne.class);
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
try {
log.info("Job One is executing from quartz");
} catch (Exception e) {
log.error("Error executing Job One: {}", e.getMessage(), e);
throw new JobExecutionException(e);
}
}
}
@Component
public class QuartzJobTwo implements Job {
private static final Logger log = LoggerFactory.getLogger(QuartzJobOne.class);
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
try {
log.info("Job Two is executing from quartz");
} catch (Exception e) {
log.error("Error executing Job Two: {}", e.getMessage(), e);
throw new JobExecutionException(e);
}
}
}
为每个任务定义 JobDetail
和 Trigger
:
@Configuration
public class QuartzConfig {
@Autowired
private Job quartzJobOne;
@Autowired
private Job quartzJobTwo;
@Bean
public JobDetail job1Detail() {
return JobBuilder.newJob().ofType(quartzJobOne.getClass())
.withIdentity("quartzJobOne", "group1")
.storeDurably()
.build();
}
@Bean
public JobDetail job2Detail() {
return JobBuilder.newJob().ofType(quartzJobTwo.getClass())
.withIdentity("quartzJobTwo", "group1")
.storeDurably()
.build();
}
@Bean
public Trigger job1Trigger(JobDetail job1Detail) {
return TriggerBuilder.newTrigger()
.forJob(job1Detail)
.withIdentity("quartzJobOneTrigger", "group1")
.withSchedule(CronScheduleBuilder.cronSchedule("0/10 * * * * ?"))
.build();
}
@Bean
public Trigger job2Trigger(JobDetail job2Detail) {
return TriggerBuilder.newTrigger()
.forJob(job2Detail)
.withIdentity("quartzJobTwoTrigger", "group1")
.withSchedule(CronScheduleBuilder.cronSchedule("0/15 * * * * ?"))
.build();
}
@Bean
public SchedulerFactoryBean schedulerFactoryBean() {
SchedulerFactoryBean schedulerFactory = new SchedulerFactoryBean();
schedulerFactory.setJobDetails(job1Detail(), job2Detail());
schedulerFactory.setTriggers(job1Trigger(job1Detail()), job2Trigger(job2Detail()));
return schedulerFactory;
}
}
使用 JobBuilder
创建 JobDetail
,通过 TriggerBuilder
和 cron 表达式定义触发时间(分别每10秒和15秒执行)。在 schedulerFactoryBean
中自动启动任务。
Quartz 灵活性极高,支持复杂调度场景,但配置比 @Scheduled
更复杂。
7. 动态任务执行
前述方案都需要静态预定义任务。但有时我们需要根据运行时条件动态创建任务。Spring Batch 的 chunk 或 tasklet 方式均可实现,这里采用 chunk 方式。
chunk 模式中,数据通过 ItemReader
读取,经 ItemProcessor
处理,最终由 ItemWriter
写入。
创建动态任务服务:
@Service
public class DynamicJobService {
private final JobRepository jobRepository;
private final JobLauncher jobLauncher;
private final PlatformTransactionManager transactionManager;
public DynamicJobService(JobRepository jobRepository, JobLauncher jobLauncher, PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.jobLauncher = jobLauncher;
this.transactionManager = transactionManager;
}
public void createAndRunJob(Map<String, List<String>> jobsData) throws Exception {
List<Job> jobs = new ArrayList<>();
// 创建 chunk 任务
for (Map.Entry<String, List<String>> entry : jobsData.entrySet()) {
if (entry.getValue() instanceof List) {
jobs.add(createJob(entry.getKey(), entry.getValue()));
}
}
// 执行所有任务
for (Job job : jobs) {
JobParameters jobParameters = new JobParametersBuilder().addString("jobID", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
jobLauncher.run(job, jobParameters);
}
}
private Job createJob(String jobName, List<String> data) {
return new JobBuilder(jobName, jobRepository).start(createStep(data))
.build();
}
private Step createStep(List<String> data) {
return new StepBuilder("step", jobRepository).<String, String> chunk(10, transactionManager)
.reader(new ListItemReader<>(data))
.processor(item -> item.toUpperCase())
.writer(items -> items.forEach(System.out::println))
.build();
}
}
createAndRunJob
方法根据输入数据动态生成任务并执行。执行流程:
reader()
从输入列表逐项读取processor()
将项目转为大写- 处理后的项目收集为 chunk(大小10)
writer()
将 chunk 内容打印到控制台- 重复直到所有数据处理完毕
测试动态任务:
@Autowired
private DynamicJobService dynamicJobService;
@Test
void givenJobData_whenJobsCreated_thenJobsRunSeccessfully() throws Exception {
Map<String, List<String>> jobsData = new HashMap<>();
jobsData.put("chunkJob1", Arrays.asList("data1", "data2", "data3"));
jobsData.put("chunkJob2", Arrays.asList("data4", "data5", "data6"));
assertDoesNotThrow(() -> dynamicJobService.createAndRunJob(jobsData), "Dynamic job creation and execution should run successfully");
}
实际项目中通常需要更复杂的处理逻辑。当内置实现不满足需求时,建议自定义 ItemReader
、ItemProcessor
和 ItemWriter
。
8. 总结
本文探讨了 Spring Batch 中执行多任务的多种方案。掌握这些基础示例后,我们可以设计出更高效、可扩展、易维护的批处理系统。
选择哪种方案取决于具体需求:
- ✅ 顺序执行:有依赖关系的任务链
- ✅ 并行执行:无依赖关系的独立任务
- ✅ 调度执行:周期性或定时任务
- ✅ 动态执行:运行时条件触发的任务
完整实现代码请参考 GitHub 仓库。