1. 概述

Spring Batch 是一个强大的框架,用于开发健壮的批处理应用程序。在之前的教程中,我们已经对 Spring Batch 进行了初步介绍

在本教程中,我们将基于已有知识,学习如何使用 Spring Boot 构建一个简单的批处理驱动的应用程序。

2. Maven 依赖配置

首先,在 pom.xml 中添加 spring-boot-starter-batch 依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
    <version>3.0.0</version>
</dependency>

同时,我们还会引入 h2 数据库依赖,它可以从 Maven Central 获取:

<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <version>2.1.214</version>
    <scope>runtime</scope>
</dependency>

3. 定义一个简单的 Spring Batch Job

我们的目标是构建一个从 CSV 文件导入咖啡列表的任务,通过自定义处理器进行转换,并将结果存储到内存数据库中

3.1. 入口程序

我们先定义应用程序的启动类:

@SpringBootApplication
public class SpringBootBatchProcessingApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootBatchProcessingApplication.class, args);
    }
}

这是一个标准的 Spring Boot 应用。为了尽可能使用默认配置,我们会保持配置文件简洁。

src/main/resources/application.properties 中添加以下配置:

file.input=coffee-list.csv

该属性指定了输入文件的位置。每行数据包含品牌、产地和风味特征:

Blue Mountain,Jamaica,Fruity
Lavazza,Colombia,Strong
Folgers,America,Smokey

由于这是一个标准的 CSV 文件,Spring Batch 可以轻松解析,无需额外定制。

接着,我们添加一个 SQL 脚本 schema-all.sql 来创建存储数据的 coffee 表:

DROP TABLE coffee IF EXISTS;

CREATE TABLE coffee  (
    coffee_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
    brand VARCHAR(20),
    origin VARCHAR(20),
    characteristics VARCHAR(30)
);

⚠️ Spring Boot 会在启动时自动执行这个脚本

3.2. Coffee 领域类

接下来,我们创建一个简单的领域类来表示咖啡对象:

public class Coffee {

    private String brand;
    private String origin;
    private String characteristics;

    public Coffee(String brand, String origin, String characteristics) {
        this.brand = brand;
        this.origin = origin;
        this.characteristics = characteristics;
    }

    // getters and setters
}

如上所示,Coffee 类包含三个字段:

  • 品牌(brand)
  • 产地(origin)
  • 特性(characteristics)

4. Job 配置

现在我们进入核心部分:Job 的配置。我们将逐步构建并解释每个组件。

@Configuration
public class BatchConfiguration {
    
    @Value("${file.input}")
    private String fileInput;
    
    // ...
}

首先是一个标准的 Spring @Configuration 类。⚠️ 注意:Spring Boot 3.0 不再推荐使用 @EnableBatchProcessing 注解。同时,JobBuilderFactoryStepBuilderFactory 已被弃用,建议使用 JobBuilderStepBuilder 并指定名称。

我们还注入了之前定义的 file.input 配置项。

4.1. Reader 和 Writer 配置

定义一个 FlatFileItemReader Bean 来读取 CSV 文件:

@Bean
public FlatFileItemReader reader() {
    return new FlatFileItemReaderBuilder().name("coffeeItemReader")
      .resource(new ClassPathResource(fileInput))
      .delimited()
      .names(new String[] { "brand", "origin", "characteristics" })
      .fieldSetMapper(new BeanWrapperFieldSetMapper() {{
          setTargetType(Coffee.class);
      }})
      .build();
}

这个 reader 会读取 coffee-list.csv 文件,将每一行解析为 Coffee 对象

然后定义一个 JdbcBatchItemWriter Bean 用于写入数据库:

@Bean
public JdbcBatchItemWriter writer(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder()
      .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
      .sql("INSERT INTO coffee (brand, origin, characteristics) VALUES (:brand, :origin, :characteristics)")
      .dataSource(dataSource)
      .build();
}

该 writer 会将 Coffee 对象中的属性通过 SQL 插入到数据库中。

4.2. 组装 Job

最后,我们定义 Job 和 Step:

@Bean
public Job importUserJob(JobRepository jobRepository, JobCompletionNotificationListener listener, Step step1) {
    return new JobBuilder("importUserJob", jobRepository)
      .incrementer(new RunIdIncrementer())
      .listener(listener)
      .flow(step1)
      .end()
      .build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager, JdbcBatchItemWriter writer) {
    return new StepBuilder("step1", jobRepository)
      .<Coffee, Coffee> chunk(10, transactionManager)
      .reader(reader())
      .processor(processor())
      .writer(writer)
      .build();
}

@Bean
public CoffeeItemProcessor processor() {
    return new CoffeeItemProcessor();
}

简单说明一下:

  • step1 定义了一个批处理步骤,每 10 条记录为一个 chunk。
  • 依次执行:读取 → 处理 → 写入。
  • importUserJob 是主任务,包含一个 Step,并注册了监听器。

我们还添加了一个 JobCompletionNotificationListener,用于在任务完成后执行回调逻辑

5. 自定义 Processor

下面是我们在 Job 中使用的自定义处理器:

public class CoffeeItemProcessor implements ItemProcessor<Coffee, Coffee> {

    private static final Logger LOGGER = LoggerFactory.getLogger(CoffeeItemProcessor.class);

    @Override
    public Coffee process(final Coffee coffee) throws Exception {
        String brand = coffee.getBrand().toUpperCase();
        String origin = coffee.getOrigin().toUpperCase();
        String chracteristics = coffee.getCharacteristics().toUpperCase();

        Coffee transformedCoffee = new Coffee(brand, origin, chracteristics);
        LOGGER.info("Converting ( {} ) into ( {} )", coffee, transformedCoffee);

        return transformedCoffee;
    }
}

该处理器将 Coffee 对象的每个字段都转换为大写,并记录日志。

6. Job 完成通知

我们定义了一个监听器来监听 Job 的完成事件:

@Override
public void afterJob(JobExecution jobExecution) {
    if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
        LOGGER.info("!!! JOB FINISHED! Time to verify the results");

        String query = "SELECT brand, origin, characteristics FROM coffee";
        jdbcTemplate.query(query, (rs, row) -> new Coffee(rs.getString(1), rs.getString(2), rs.getString(3)))
          .forEach(coffee -> LOGGER.info("Found < {} > in the database.", coffee));
    }
}

当 Job 成功完成后,会执行 afterJob 方法,查询数据库并打印结果。

7. 运行 Job

一切就绪后,运行程序,控制台输出如下:

...
17:41:16.336 [main] INFO  c.b.b.JobCompletionNotificationListener -
  !!! JOB FINISHED! Time to verify the results
17:41:16.336 [main] INFO  c.b.b.JobCompletionNotificationListener -
  Found < Coffee [brand=BLUE MOUNTAIN, origin=JAMAICA, characteristics=FRUITY] > in the database.
17:41:16.337 [main] INFO  c.b.b.JobCompletionNotificationListener -
  Found < Coffee [brand=LAVAZZA, origin=COLOMBIA, characteristics=STRONG] > in the database.
17:41:16.337 [main] INFO  c.b.b.JobCompletionNotificationListener -
  Found < Coffee [brand=FOLGERS, origin=AMERICA, characteristics=SMOKEY] > in the database.
...

Job 成功执行,数据也已正确写入数据库

8. 总结

本文我们演示了如何使用 Spring Boot 快速搭建一个 Spring Batch 应用:

  • 配置基础环境
  • 定义 Reader 和 Writer
  • 添加自定义 Processor
  • 使用 Listener 监听 Job 完成状态

完整代码可以在 GitHub 获取。


原始标题:Spring Boot With Spring Batch