1. 概述
Spring Batch 是一个强大的框架,用于开发健壮的批处理应用程序。在之前的教程中,我们已经对 Spring Batch 进行了初步介绍。
在本教程中,我们将基于已有知识,学习如何使用 Spring Boot 构建一个简单的批处理驱动的应用程序。
2. Maven 依赖配置
首先,在 pom.xml 中添加 spring-boot-starter-batch 依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>3.0.0</version>
</dependency>
同时,我们还会引入 h2 数据库依赖,它可以从 Maven Central 获取:
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.1.214</version>
<scope>runtime</scope>
</dependency>
3. 定义一个简单的 Spring Batch Job
✅ 我们的目标是构建一个从 CSV 文件导入咖啡列表的任务,通过自定义处理器进行转换,并将结果存储到内存数据库中。
3.1. 入口程序
我们先定义应用程序的启动类:
@SpringBootApplication
public class SpringBootBatchProcessingApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootBatchProcessingApplication.class, args);
}
}
这是一个标准的 Spring Boot 应用。为了尽可能使用默认配置,我们会保持配置文件简洁。
在 src/main/resources/application.properties
中添加以下配置:
file.input=coffee-list.csv
该属性指定了输入文件的位置。每行数据包含品牌、产地和风味特征:
Blue Mountain,Jamaica,Fruity
Lavazza,Colombia,Strong
Folgers,America,Smokey
由于这是一个标准的 CSV 文件,Spring Batch 可以轻松解析,无需额外定制。
接着,我们添加一个 SQL 脚本 schema-all.sql
来创建存储数据的 coffee
表:
DROP TABLE coffee IF EXISTS;
CREATE TABLE coffee (
coffee_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
brand VARCHAR(20),
origin VARCHAR(20),
characteristics VARCHAR(30)
);
⚠️ Spring Boot 会在启动时自动执行这个脚本。
3.2. Coffee 领域类
接下来,我们创建一个简单的领域类来表示咖啡对象:
public class Coffee {
private String brand;
private String origin;
private String characteristics;
public Coffee(String brand, String origin, String characteristics) {
this.brand = brand;
this.origin = origin;
this.characteristics = characteristics;
}
// getters and setters
}
如上所示,Coffee
类包含三个字段:
- 品牌(brand)
- 产地(origin)
- 特性(characteristics)
4. Job 配置
现在我们进入核心部分:Job 的配置。我们将逐步构建并解释每个组件。
@Configuration
public class BatchConfiguration {
@Value("${file.input}")
private String fileInput;
// ...
}
首先是一个标准的 Spring @Configuration
类。⚠️ 注意:Spring Boot 3.0 不再推荐使用 @EnableBatchProcessing
注解。同时,JobBuilderFactory
和 StepBuilderFactory
已被弃用,建议使用 JobBuilder
和 StepBuilder
并指定名称。
我们还注入了之前定义的 file.input
配置项。
4.1. Reader 和 Writer 配置
定义一个 FlatFileItemReader
Bean 来读取 CSV 文件:
@Bean
public FlatFileItemReader reader() {
return new FlatFileItemReaderBuilder().name("coffeeItemReader")
.resource(new ClassPathResource(fileInput))
.delimited()
.names(new String[] { "brand", "origin", "characteristics" })
.fieldSetMapper(new BeanWrapperFieldSetMapper() {{
setTargetType(Coffee.class);
}})
.build();
}
✅ 这个 reader 会读取 coffee-list.csv
文件,将每一行解析为 Coffee
对象。
然后定义一个 JdbcBatchItemWriter
Bean 用于写入数据库:
@Bean
public JdbcBatchItemWriter writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO coffee (brand, origin, characteristics) VALUES (:brand, :origin, :characteristics)")
.dataSource(dataSource)
.build();
}
该 writer 会将 Coffee
对象中的属性通过 SQL 插入到数据库中。
4.2. 组装 Job
最后,我们定义 Job 和 Step:
@Bean
public Job importUserJob(JobRepository jobRepository, JobCompletionNotificationListener listener, Step step1) {
return new JobBuilder("importUserJob", jobRepository)
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager, JdbcBatchItemWriter writer) {
return new StepBuilder("step1", jobRepository)
.<Coffee, Coffee> chunk(10, transactionManager)
.reader(reader())
.processor(processor())
.writer(writer)
.build();
}
@Bean
public CoffeeItemProcessor processor() {
return new CoffeeItemProcessor();
}
简单说明一下:
step1
定义了一个批处理步骤,每 10 条记录为一个 chunk。- 依次执行:读取 → 处理 → 写入。
importUserJob
是主任务,包含一个 Step,并注册了监听器。
✅ 我们还添加了一个 JobCompletionNotificationListener
,用于在任务完成后执行回调逻辑。
5. 自定义 Processor
下面是我们在 Job 中使用的自定义处理器:
public class CoffeeItemProcessor implements ItemProcessor<Coffee, Coffee> {
private static final Logger LOGGER = LoggerFactory.getLogger(CoffeeItemProcessor.class);
@Override
public Coffee process(final Coffee coffee) throws Exception {
String brand = coffee.getBrand().toUpperCase();
String origin = coffee.getOrigin().toUpperCase();
String chracteristics = coffee.getCharacteristics().toUpperCase();
Coffee transformedCoffee = new Coffee(brand, origin, chracteristics);
LOGGER.info("Converting ( {} ) into ( {} )", coffee, transformedCoffee);
return transformedCoffee;
}
}
✅ 该处理器将 Coffee
对象的每个字段都转换为大写,并记录日志。
6. Job 完成通知
我们定义了一个监听器来监听 Job 的完成事件:
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
LOGGER.info("!!! JOB FINISHED! Time to verify the results");
String query = "SELECT brand, origin, characteristics FROM coffee";
jdbcTemplate.query(query, (rs, row) -> new Coffee(rs.getString(1), rs.getString(2), rs.getString(3)))
.forEach(coffee -> LOGGER.info("Found < {} > in the database.", coffee));
}
}
当 Job 成功完成后,会执行 afterJob
方法,查询数据库并打印结果。
7. 运行 Job
一切就绪后,运行程序,控制台输出如下:
...
17:41:16.336 [main] INFO c.b.b.JobCompletionNotificationListener -
!!! JOB FINISHED! Time to verify the results
17:41:16.336 [main] INFO c.b.b.JobCompletionNotificationListener -
Found < Coffee [brand=BLUE MOUNTAIN, origin=JAMAICA, characteristics=FRUITY] > in the database.
17:41:16.337 [main] INFO c.b.b.JobCompletionNotificationListener -
Found < Coffee [brand=LAVAZZA, origin=COLOMBIA, characteristics=STRONG] > in the database.
17:41:16.337 [main] INFO c.b.b.JobCompletionNotificationListener -
Found < Coffee [brand=FOLGERS, origin=AMERICA, characteristics=SMOKEY] > in the database.
...
✅ Job 成功执行,数据也已正确写入数据库。
8. 总结
本文我们演示了如何使用 Spring Boot 快速搭建一个 Spring Batch 应用:
- 配置基础环境
- 定义 Reader 和 Writer
- 添加自定义 Processor
- 使用 Listener 监听 Job 完成状态
完整代码可以在 GitHub 获取。