1. 概述

Spring Batch是Java中强大的批处理框架,因此成为数据处理活动和定时作业运行的热门选择。根据业务逻辑的复杂程度,作业可能依赖不同的配置值和动态参数。

在本文中,我们将探讨如何使用JobParameters以及如何从核心批处理组件中访问它们。

2. 演示设置

我们将为药房服务开发一个Spring Batch作业。主要业务任务是查找即将过期的药品,根据销售情况计算新价格,并通知消费者即将过期的药品。此外,我们将从内存H2数据库中读取数据,并将所有处理细节写入日志以简化实现。

2.1. 依赖项

要开始演示应用程序,我们需要添加Spring Batch和H2依赖项:

<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <version>2.2.224</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
    <version>3.2.0</version>
</dependency>

我们可以在Maven中央仓库中找到最新的H2Spring Batch版本。

2.2. 准备测试数据

首先,在schema-all.sql中定义schema

DROP TABLE medicine IF EXISTS;

CREATE TABLE medicine  (
    med_id VARCHAR(36) PRIMARY KEY,
    name VARCHAR(30),
    type VARCHAR(30),
    expiration_date TIMESTAMP,
    original_price DECIMAL,
    sale_price DECIMAL
);

初始测试数据在data.sql中提供:

INSERT INTO medicine VALUES ('ec278dd3-87b9-4ad1-858f-dfe5bc34bdb5', 'Lidocaine', 'ANESTHETICS', DATEADD('DAY', 120, CURRENT_DATE), 10, null);
INSERT INTO medicine VALUES ('9d39321d-34f3-4eb7-bb9a-a69734e0e372', 'Flucloxacillin', 'ANTIBACTERIALS', DATEADD('DAY', 40, CURRENT_DATE), 20, null);
INSERT INTO medicine VALUES ('87f4ff13-de40-4c7f-95db-627f309394dd', 'Amoxicillin', 'ANTIBACTERIALS', DATEADD('DAY', 70, CURRENT_DATE), 30, null);
INSERT INTO medicine VALUES ('acd99d6a-27be-4c89-babe-0edf4dca22cb', 'Prozac', 'ANTIDEPRESSANTS', DATEADD('DAY', 30, CURRENT_DATE), 40, null);

Spring Boot在应用程序启动时运行这些文件,我们将在测试执行中使用这些测试数据。

2.3. Medicine领域类

对于我们的服务,我们需要一个简单的Medicine实体类:

@AllArgsConstructor
@Data
public class Medicine {
    private UUID id;
    private String name;
    private MedicineCategory type;
    private Timestamp expirationDate;
    private Double originalPrice;
    private Double salePrice;
}

ItemReader使用expirationDate字段来计算药品是否即将过期。当药品接近过期日期时,ItemProcessor将更新salePrice字段。

2.4. 应用程序属性

应用程序需要在src/main/resources/application.properties文件中设置多个属性:

spring.batch.job.enabled=false
batch.medicine.cron=0 */1 * * * *
batch.medicine.alert_type=LOGS
batch.medicine.expiration.default.days=60
batch.medicine.start.sale.default.days=45
batch.medicine.sale=0.1

由于我们只配置一个作业,应将spring.batch.job.enabled设置为false以禁用初始作业执行。默认情况下,Spring在上下文启动后使用空参数运行作业:

[main] INFO  o.s.b.a.b.JobLauncherApplicationRunner - Running default command line with: []

其他属性是InputReaderInputProcessorInpurWriter执行业务逻辑所必需的。

3. 作业参数

Spring Batch包含一个JobParameters类,用于存储特定作业运行的运行时参数。此功能在各种情况下都很有用。例如,它允许传递在特定运行期间生成的动态变量。此外,它还可以创建一个控制器,该控制器可以根据客户端提供的参数启动作业。

在我们的场景中,我们将使用此类来保存应用程序参数和动态运行时参数。

3.1. StepScopeJobScope

除了常规Spring中众所周知的bean作用域外,Spring Batch引入了两个额外的作用域:StepScopeJobScope。使用这些作用域,可以为工作流中的每个步骤或作业创建唯一的bean。Spring确保与特定步骤/作业关联的资源在其整个生命周期中隔离并独立管理。

有了这个功能,我们可以轻松控制上下文,并在特定运行的读取、处理和写入部分之间共享所有需要的属性。为了能够注入作业参数,我们需要使用@StepScope*或@JobScope注解依赖的bean。*

3.2. 在定时执行中填充作业参数

让我们定义MedExpirationBatchRunner类,它将通过cron表达式(在我们的例子中是每1分钟)启动我们的作业。我们应该使用@EnableScheduling*注解该类并定义适当的@Scheduled入口方法:*

@Component
@EnableScheduling
public class MedExpirationBatchRunner {
    ...
    @Scheduled(cron = "${batch.medicine.cron}", zone = "GMT")
    public void runJob() {
        ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
        launchJob(now);
    }
}

由于我们要手动启动作业,应该使用JobLaucher类并在JobLauncher#run()方法中提供填充的JobParameter。在我们的示例中,我们提供了来自application.properties的值以及两个运行特定的参数(作业被触发的日期和跟踪ID):

public void launchJob(ZonedDateTime triggerZonedDateTime) {
    try {
        JobParameters jobParameters = new JobParametersBuilder()
          .addString(BatchConstants.TRIGGERED_DATE_TIME, triggerZonedDateTime.toString())
          .addString(BatchConstants.ALERT_TYPE, alertType)
          .addLong(BatchConstants.DEFAULT_EXPIRATION, defaultExpiration)
          .addLong(BatchConstants.SALE_STARTS_DAYS, saleStartDays)
          .addDouble(BatchConstants.MEDICINE_SALE, medicineSale)
          .addString(BatchConstants.TRACE_ID, UUID.randomUUID().toString())
          .toJobParameters();

        jobLauncher.run(medExpirationJob, jobParameters);
    } catch (Exception e) {
        log.error("Failed to run", e);
    }
}

配置参数后,我们有几种选择在代码中使用这些值。

3.3. 在Bean定义中读取作业参数

使用SpEL,我们可以从配置类中的bean定义访问作业参数。Spring将所有参数组合成一个常规的StringObject的映射:

@Bean
@StepScope
public MedicineProcessor medicineProcessor(@Value("#{jobParameters}") Map<String, Object> jobParameters) {
    ...
}

在方法内部,我们将使用jobParameters来初始化MedicineProcessor的适当字段。

3.4. 在服务中直接读取作业参数

另一种选择是在ItemReader本身中使用setter注入。我们可以通过SpEL表达式获取确切的参数值,就像从任何其他映射中获取一样:

@Setter
public class ExpiresSoonMedicineReader extends AbstractItemCountingItemStreamItemReader<Medicine> {

    @Value("#{jobParameters['DEFAULT_EXPIRATION']}")
    private long defaultExpiration;
}

我们只需要确保SpEL中使用的键与参数初始化期间使用的键相同。

3.5. 通过Before Step读取作业参数

Spring Batch提供了一个StepExecutionListener接口,允许我们监听步骤执行阶段:步骤开始前和步骤完成后。我们可以利用此功能,在步骤开始之前访问属性并执行任何自定义逻辑。最简单的方法是使用*@BeforeStep注解,它对应于StepExecutionListener中的beforeStep()*方法:

@BeforeStep
public void beforeStep(StepExecution stepExecution) {
    JobParameters parameters = stepExecution.getJobExecution()
      .getJobParameters();
    ...
    log.info("Before step params: {}", parameters);
}

4. 作业配置

让我们将所有部分组合起来,看看全貌。

有两个属性是读取器、处理器和写入器所必需的:BatchConstants.TRIGGERED_DATE_TIMEBatchConstants.TRACE_ID

我们将对所有步骤bean定义中的公共参数使用相同的提取逻辑:

private void enrichWithJobParameters(Map<String, Object> jobParameters, ContainsJobParameters container) {
    if (jobParameters.get(BatchConstants.TRIGGERED_DATE_TIME) != null) {
        container.setTriggeredDateTime(ZonedDateTime.parse(jobParameters.get(BatchConstants.TRIGGERED_DATE_TIME)
          .toString()));
    }
    if (jobParameters.get(BatchConstants.TRACE_ID) != null) {
        container.setTraceId(jobParameters.get(BatchConstants.TRACE_ID).toString());
    }
}

其他参数都是特定于组件的,没有通用逻辑。

4.1. 配置ItemReader

首先,我们要配置ExpiresSoonMedicineReader并丰富公共参数:

@Bean
@StepScope
public ExpiresSoonMedicineReader expiresSoonMedicineReader(JdbcTemplate jdbcTemplate, @Value("#{jobParameters}") Map<String, Object> jobParameters) {

    ExpiresSoonMedicineReader medicineReader = new ExpiresSoonMedicineReader(jdbcTemplate);
    enrichWithJobParameters(jobParameters, medicineReader);
    return medicineReader;
}

让我们仔细看看具体的读取器实现。TriggeredDateTimetraceId参数在bean构造期间直接注入,defaultExpiration参数由Spring通过setter注入。为了演示,我们在*doOpen()*方法中使用了所有这些参数:

public class ExpiresSoonMedicineReader extends AbstractItemCountingItemStreamItemReader<Medicine> implements ContainsJobParameters {

    private ZonedDateTime triggeredDateTime;
    private String traceId;
    @Value("#{jobParameters['DEFAULT_EXPIRATION']}")
    private long defaultExpiration;

    private List<Medicine> expiringMedicineList;

    ...

    @Override
    protected void doOpen() {
        expiringMedicineList = jdbcTemplate.query(FIND_EXPIRING_SOON_MEDICINE, ps -> ps.setLong(1, defaultExpiration), (rs, row) -> getMedicine(rs));

        log.info("Trace = {}. Found {} meds that expires soon", traceId, expiringMedicineList.size());
        if (!expiringMedicineList.isEmpty()) {
            setMaxItemCount(expiringMedicineList.size());
        }
    }

    @PostConstruct
    public void init() {
        setName(ClassUtils.getShortName(getClass()));
    }

}

ItemReader不应标记为*@Component。此外,我们需要调用setName()*方法来设置所需的读取器名称。

4.2. 配置ItemProcessorItemWriter

ItemProcessorItemWriter遵循与ItemReader相同的方法。因此,它们不需要任何特定配置来访问参数。bean定义逻辑通过*enrichWithJobParameters()*方法初始化公共参数。其他由单个类使用且不需要在所有组件中填充的参数,由Spring通过相应类中的setter注入进行丰富。

我们应该将所有依赖属性的bean标记为*@StepScope*注解。否则,Spring只会在上下文启动时创建一次bean,并且没有参数值可以注入。

4.3. 配置完整流程

我们不需要采取任何特定操作来配置带参数的作业。因此,我们只需要组合所有bean:

@Bean
public Job medExpirationJob(JobRepository jobRepository,
    PlatformTransactionManager transactionManager,
    MedicineWriter medicineWriter,
    MedicineProcessor medicineProcessor,
    ExpiresSoonMedicineReader expiresSoonMedicineReader) {
    Step notifyAboutExpiringMedicine = new StepBuilder("notifyAboutExpiringMedicine", jobRepository).<Medicine, Medicine>chunk(10)
      .reader(expiresSoonMedicineReader)
      .processor(medicineProcessor)
      .writer(medicineWriter)
      .faultTolerant()
      .transactionManager(transactionManager)
      .build();

    return new JobBuilder("medExpirationJob", jobRepository)
      .incrementer(new RunIdIncrementer())
      .start(notifyAboutExpiringMedicine)
      .build();
}

5. 运行应用程序

让我们运行一个完整的示例,看看应用程序如何使用所有参数。我们需要从SpringBatchExpireMedicationApplication类启动Spring Boot应用程序。

一旦定时方法执行,Spring会记录所有参数:

INFO  o.s.b.c.l.support.SimpleJobLauncher - Job: [SimpleJob: [name=medExpirationJob]] launched with the following parameters: [{'SALE_STARTS_DAYS':'{value=45, type=class java.lang.Long, identifying=true}','MEDICINE_SALE':'{value=0.1, type=class java.lang.Double, identifying=true}','TRACE_ID':'{value=e35a26a4-4d56-4dfe-bf36-c1e5f20940a5, type=class java.lang.String, identifying=true}','ALERT_TYPE':'{value=LOGS, type=class java.lang.String, identifying=true}','TRIGGERED_DATE_TIME':'{value=2023-12-06T22:36:00.011436600Z, type=class java.lang.String, identifying=true}','DEFAULT_EXPIRATION':'{value=60, type=class java.lang.Long, identifying=true}'}]

首先,ItemReader根据DEFAULT_EXPIRATION参数写入有关找到的药品的信息:

INFO  c.b.b.job.ExpiresSoonMedicineReader - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5. Found 2 meds that expires soon

其次,ItemProcessor使用SALE_STARTS_DAYSMEDICINE_SALE参数计算新价格:

INFO  c.b.b.job.MedicineProcessor - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5, calculated new sale price 18.0 for medicine 9d39321d-34f3-4eb7-bb9a-a69734e0e372
INFO  c.b.b.job.MedicineProcessor - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5, calculated new sale price 36.0 for medicine acd99d6a-27be-4c89-babe-0edf4dca22cb

最后,ItemWriter在同一跟踪中将更新的药品写入日志:

INFO  c.b.b.job.MedicineWriter - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5. This medicine is expiring Medicine(id=9d39321d-34f3-4eb7-bb9a-a69734e0e372, name=Flucloxacillin, type=ANTIBACTERIALS, expirationDate=2024-01-16 00:00:00.0, originalPrice=20.0, salePrice=18.0)
INFO  c.b.b.job.MedicineWriter - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5. This medicine is expiring Medicine(id=acd99d6a-27be-4c89-babe-0edf4dca22cb, name=Prozac, type=ANTIDEPRESSANTS, expirationDate=2024-01-06 00:00:00.0, originalPrice=40.0, salePrice=36.0)
INFO  c.b.b.job.MedicineWriter - Finishing job started at 2023-12-07T11:58:00.014430400Z

6. 结论

在本文中,我们学习了如何在Spring Batch中使用作业参数。ItemReaderItemProcessorItemWriter可以在bean初始化期间手动填充参数,也可以通过*@BeforeStep*或setter注入由Spring填充。

一如既往,完整的示例可在GitHub上找到。


原始标题:Access Job Parameters From ItemReader in Spring Batch | Baeldung