1. 概述

本教程将带你快速了解 Spring Batch,一个专注于批量任务执行的框架。Spring Batch 提供了强大的批处理能力,适用于需要高可靠性、可扩展性的企业级数据处理场景。

当前版本 5.2.0 支持 Spring 6.2.0 和 Java 17+。

👉 这里有一些实用的使用场景 可供参考。

2. 批处理工作流基础

Spring Batch 遵循经典的批处理架构,通过 JobRepository 来调度和管理 Job 的执行。

一个 Job 可以包含多个 Step,每个 Step 通常按照 读取 → 处理 → 写入 的流程来处理数据。

✅ 框架会帮我们处理大部分底层逻辑,包括 Job 的持久化操作(例如使用 H2 数据库存储 Job 信息)。

2.1. 示例场景

我们要处理的场景是将 CSV 格式的金融交易数据转换为 XML 格式。

输入文件结构非常简单,每行一条交易记录,字段包括用户名、用户 ID、交易日期和金额:

username, userid, transaction_date, transaction_amount
devendra, 1234, 31/10/2015, 10000
john, 2134, 3/12/2015, 12321
robin, 2134, 2/02/2015, 23411

3. Maven 依赖

项目所需的核心依赖包括:

  • spring-batch-core:核心批处理功能
  • spring-oxm:对象与 XML 映射支持
  • h2:内存数据库,用于存储 Job 元数据
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-oxm</artifactId>
    <version>6.2.0</version>
</dependency>
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <version>2.3.232</version>
</dependency>
<dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-core</artifactId>
    <version>5.2.0</version>
</dependency>

4. 自动创建 Spring Batch 表结构

使用 Spring Batch 时,可以借助预定义的 SQL 脚本自动初始化数据库表结构。

✅ 当使用 H2 内存数据库时,Spring Boot 会自动执行初始化脚本。

但如果使用其他数据库,需要手动配置以下属性来启用自动初始化。

application.properties 配置:

spring.batch.jdbc.initialize-schema=always

或者 application.yml 配置:

spring:
  batch:
    jdbc:
      initialize-schema: "always"

⚠️ 注意:不要在 BatchConfig 类上添加 @EnableBatchProcessing 注解,否则 Spring Boot 无法自动配置 Batch 相关组件。

若想禁用自动建表,可以将属性设为 never

spring.batch.jdbc.initialize-schema=never

或者:

spring:
  batch:
    jdbc:
      initialize-schema: "never"

5. Spring Batch 与 Job 配置

下面是基于 Java 的配置示例,实现从 CSV 到 XML 的转换功能。

Java 配置:

@Profile("spring")
public class SpringBatchConfig {

    @Value("input/record.csv")
    private Resource inputCsv;

    @Value("file:xml/output.xml")
    private Resource outputXml;

    @Bean
    public ItemReader<Transaction> itemReader()
      throws UnexpectedInputException, ParseException {
        FlatFileItemReader<Transaction> reader = new FlatFileItemReader<Transaction>();
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        String[] tokens = { "username", "userid", "transactiondate", "amount" };
        tokenizer.setNames(tokens);
        reader.setResource(inputCsv);
        DefaultLineMapper<Transaction> lineMapper = 
          new DefaultLineMapper<Transaction>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
        reader.setLineMapper(lineMapper);
        return reader;
    }

    @Bean
    public ItemProcessor<Transaction, Transaction> itemProcessor() {
        return new CustomItemProcessor();
    }

    @Bean
    public ItemWriter<Transaction> itemWriter(Marshaller marshaller)
      throws MalformedURLException {
        StaxEventItemWriter<Transaction> itemWriter = 
          new StaxEventItemWriter<Transaction>();
        itemWriter.setMarshaller(marshaller);
        itemWriter.setRootTagName("transactionRecord");
        itemWriter.setResource(outputXml);
        return itemWriter;
    }

    @Bean
    public Marshaller marshaller() {
        Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
        marshaller.setClassesToBeBound(new Class[] { Transaction.class });
        return marshaller;
    }

    @Bean
    protected Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager, 
      ItemReader<Transaction> reader, ItemProcessor<Transaction, Transaction> processor, 
      ItemWriter<Transaction> writer) {
        return new StepBuilder("step1", jobRepository)
          .<Transaction, Transaction> chunk(10, transactionManager)
          .reader(reader).processor(processor).writer(writer).build();
    }

    @Bean(name = "firstBatchJob")
    public Job job(JobRepository jobRepository, @Qualifier("step1") Step step1) {
        return new JobBuilder("firstBatchJob", jobRepository).preventRestart().start(step1).build();
    }
    
    public DataSource dataSource() {
     EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder();
     return builder.setType(EmbeddedDatabaseType.H2)
       .addScript("classpath:org/springframework/batch/core/schema-drop-h2.sql")
       .addScript("classpath:org/springframework/batch/core/schema-h2.sql")
       .build();
    }
    
    @Bean(name = "transactionManager")
    public PlatformTransactionManager getTransactionManager() {
        return new ResourcelessTransactionManager();
    }
    
    @Bean(name = "jobRepository")
    public JobRepository getJobRepository() throws Exception {
        JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
        factory.setDataSource(dataSource());
        factory.setTransactionManager(getTransactionManager());
        factory.afterPropertiesSet();
        return factory.getObject();
    }
    
    @Bean(name = "jobLauncher")
    public JobLauncher getJobLauncher() throws Exception {
       TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
       jobLauncher.setJobRepository(getJobRepository());
       jobLauncher.afterPropertiesSet();
       return jobLauncher;
    }
}

XML 配置:

<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="input/record.csv" />
    <property name="lineMapper">
        <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
            <property name="lineTokenizer">
                <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                    <property name="names" value="username,userid,transactiondate,amount" />
                </bean>
            </property>
            <property name="fieldSetMapper">
                <bean class="com.baeldung.batch.service.RecordFieldSetMapper" />
            </property>
        </bean>
    </property>
    <property name="linesToSkip" value="1" />
</bean>

<bean id="itemProcessor" class="com.baeldung.batch.service.CustomItemProcessor" />

<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
    <property name="resource" value="file:xml/output.xml" />
    <property name="marshaller" ref="marshaller" />
    <property name="rootTagName" value="transactionRecord" />
</bean>

<bean id="marshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
    <property name="classesToBeBound">
        <list>
            <value>com.baeldung.batch.model.Transaction</value>
        </list>
    </property>
</bean>

<batch:job id="firstBatchJob">
    <batch:step id="step1">
        <batch:tasklet>
            <batch:chunk reader="itemReader" writer="itemWriter"
              processor="itemProcessor" commit-interval="10">
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>

<!-- connect to H2 database -->
<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
    <property name="driverClassName" value="org.h2.Driver" />
    <property name="url" value="jdbc:h2:file:~/repository" />
    <property name="username" value="" />
    <property name="password" value="" />
</bean>

<!-- stored job-meta in database -->
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
    <property name="dataSource" ref="dataSource" />
    <property name="transactionManager" ref="transactionManager" />
    <property name="databaseType" value="h2" />
</bean>

<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
</bean>

5.1. 数据读取与对象映射:ItemReader

我们使用 FlatFileItemReader 读取 CSV 文件,并将其转换为 Transaction 对象:

@SuppressWarnings("restriction")
@XmlRootElement(name = "transactionRecord")
public class Transaction {
    private String username;
    private int userId;
    private LocalDateTime transactionDate;
    private double amount;

    /* getters and setters for the attributes */

    @Override
    public String toString() {
        return "Transaction [username=" + username + ", userId=" + userId
          + ", transactionDate=" + transactionDate + ", amount=" + amount
          + "]";
    }
}

字段映射使用自定义的 FieldSetMapper

public class RecordFieldSetMapper implements FieldSetMapper<Transaction> {
 
    public Transaction mapFieldSet(FieldSet fieldSet) throws BindException {
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("d/M/yyy");
        Transaction transaction = new Transaction();
 
        transaction.setUsername(fieldSet.readString("username"));
        transaction.setUserId(fieldSet.readInt(1));
        transaction.setAmount(fieldSet.readDouble(3));
        String dateString = fieldSet.readString(2);
        transaction.setTransactionDate(LocalDate.parse(dateString, formatter).atStartOfDay());
        return transaction;
    }
}

5.2. 数据处理:ItemProcessor

我们自定义了一个 CustomItemProcessor,虽然没有实际处理逻辑,但它负责将数据从 Reader 传递给 Writer:

public class CustomItemProcessor implements ItemProcessor<Transaction, Transaction> {

    public Transaction process(Transaction item) {
        return item;
    }
}

5.3. 数据写入:ItemWriter

最终将数据写入 XML 文件:

<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
    <property name="resource" value="file:xml/output.xml" />
    <property name="marshaller" ref="marshaller" />
    <property name="rootTagName" value="transactionRecord" />
</bean>

5.4. Job 配置

通过 batch:job 将 Reader、Processor 和 Writer 组装成一个完整的 Job:

@Bean
protected Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager,
  @Qualifier("itemProcessor") ItemProcessor<Transaction, Transaction> processor, ItemWriter<Transaction> writer) {
    return new StepBuilder("step1", jobRepository)
      .<Transaction, Transaction> chunk(10, transactionManager)
      .reader(itemReader(inputCsv))
      .processor(processor)
      .writer(writer)
      .build();
}
<batch:job id="firstBatchJob">
    <batch:step id="step1">
        <batch:tasklet>
            <batch:chunk reader="itemReader" writer="itemWriter" processor="itemProcessor" commit-interval="10">
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>

7. Spring Boot 集成

7.1. Maven 依赖

添加 spring-boot-starter-batch 依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
    <version>3.4.0</version>
</dependency>

7.2. 启动类配置

使用 @Profile 区分配置环境:

@SpringBootApplication
public class SpringBatchApplication {

    public static void main(String[] args) {
        SpringApplication springApp = new SpringApplication(SpringBatchApplication.class);
        springApp.setAdditionalProfiles("spring-boot");
        springApp.run(args);
    }

}

7.3. Spring Boot 批处理配置

从 SpringBatchConfig 转换为 Spring Boot 风格:

@Configuration
public class SpringBootBatchConfig {

    @Value("input/record.csv")
    private Resource inputCsv;

    @Value("input/recordWithInvalidData.csv")
    private Resource invalidInputCsv;

    @Value("file:xml/output.xml")
    private Resource outputXml;

    // ...
}

⚠️ 从 Spring Boot 3.0 开始,不推荐使用 @EnableBatchProcessing 注解。建议手动声明 JobRepositoryJobLauncherTransactionManager

8. 总结

Spring Batch 是一个功能强大、结构清晰的批处理框架,非常适合处理大量数据的读取、处理和写入任务。

✅ 通过 Reader、Processor 和 Writer 的组合,我们可以轻松构建可扩展、可维护的批处理任务。

⚠️ 在 Spring Boot 中集成时,注意配置方式的变更,避免踩坑。


原始标题:Introduction to Spring Batch | Baeldung

« 上一篇: Guava 18 新特性一览
» 下一篇: Java Web周报52