1. 简介

本文将带你掌握 Spring Integration 的 Java DSL(领域特定语言),用于构建高效的应用集成方案。

我们会复刻之前在《Spring Integration 入门》中用 XML 配置实现的“文件搬运”功能,但这次完全使用 Java DSL 实现。✅
这种方式更符合现代 Java 开发习惯,代码可读性更强,也更容易调试和维护。

2. 依赖配置

Spring Integration Java DSL 已集成在 spring-integration-core 模块中,无需额外引入 DSL 专用包。

只需添加以下核心依赖:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
    <version>6.0.0</version>
</dependency>

由于我们要实现文件搬运功能,还需引入文件模块:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-file</artifactId>
    <version>6.0.0</version>
</dependency>

3. Spring Integration Java DSL 概述

在 Java DSL 出现之前,Spring Integration 主要通过 XML 配置组件,略显繁琐。

Java DSL 的核心优势是提供了流畅的构建器(fluent builders),让我们能用纯 Java 代码构建完整的集成流水线。

举个简单例子:实现一个将输入字符串转为大写的通道。

传统 XML 写法:

<int:channel id="input"/>

<int:transformer input-channel="input" expression="payload.toUpperCase()" />

使用 Java DSL 后,变得简洁直观:

@Bean
public IntegrationFlow upcaseFlow() {
    return IntegrationFlow.from("input")
      .transform(String::toUpperCase)
      .get();
}

是不是简单粗暴多了?👏

4. 文件搬运应用实战

我们从零开始构建一个监听目录、过滤 JPG 文件并自动搬运的集成流程。

4.1. 集成流(Integration Flow)

集成流是 Spring Integration 的核心构建块,通过 IntegrationFlows 构建器创建:

IntegrationFlows.from(...)

from 方法支持多种输入类型,本文重点介绍三种:

  • MessageSource:消息源
  • MessageChannel:消息通道
  • String:通道名称

构建流程的基本模式如下:

IntegrationFlow flow = IntegrationFlow.from(sourceDirectory())
  .filter(onlyJpgs())
  .handle(targetDirectory())
  // 可继续添加其他组件
  .get();

⚠️ 最终通过 .get() 返回的 IntegrationFlow 实例,必须注册为 Spring Bean 才能激活。

所有 Spring Integration 应用的本质,都是“输入 → 转换 → 输出”的流水线模式。

4.2. 定义输入源(Input Source)

要搬运文件,首先得告诉系统从哪读取文件——这就是 MessageSource 的职责。

简单来说,MessageSource 是外部数据源(如文件系统、消息队列)与 Spring 消息模型之间的适配器。这类组件也常被称为 输入通道适配器(Input Channel Adapter)

我们使用 spring-integration-file 提供的 FileReadingMessageSource

@Bean
public MessageSource<File> sourceDirectory() {
    FileReadingMessageSource messageSource = new FileReadingMessageSource();
    messageSource.setDirectory(new File("/tmp/input")); // 示例路径
    return messageSource;
}

然后将其作为集成流的起点:

IntegrationFlow.from(sourceDirectory());

4.3. 配置输入源轮询策略

如果希望应用能持续监听新文件(而不是只处理启动时已存在的文件),就需要配置轮询器(poller)。

IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(10000)));

上述代码表示每 10 秒轮询一次输入目录,检测新文件。✅
这个机制适用于所有 MessageSource,非常实用。

4.4. 消息过滤(Filter)

假设我们只想搬运 JPG 图片文件,可以使用过滤器。

先定义一个 GenericSelector Bean:

@Bean
public GenericSelector<File> onlyJpgs() {
    return new GenericSelector<File>() {
        @Override
        public boolean accept(File source) {
          return source.getName().endsWith(".jpg");
        }
    };
}

在流程中使用:

IntegrationFlow.from(sourceDirectory())
  .filter(onlyJpgs());

⚠️ 由于逻辑简单,更推荐使用 Lambda 写法,代码更简洁:

IntegrationFlow.from(sourceDirectory())
  .filter(source -> ((File) source).getName().endsWith(".jpg"));

4.5. 使用服务激活器处理消息(Service Activator)

过滤后的文件需要写入目标目录,这时用到 服务激活器(Service Activator)

我们使用 FileWritingMessageHandler

@Bean
public MessageHandler targetDirectory() {
    FileWritingMessageHandler handler = new FileWritingMessageHandler(new File("/tmp/output")); // 示例路径
    handler.setFileExistsMode(FileExistsMode.REPLACE);
    handler.setExpectReply(false);
    return handler;
}

关键参数说明:

  • setFileExistsMode(REPLACE):文件已存在时直接覆盖
  • setExpectReply(false):单向通信,不期望响应 ❌(双向流才需要回复)

加入流程:

IntegrationFlow.from(sourceDirectory())
  .filter(onlyJpgs())
  .handle(targetDirectory());

4.6. 激活集成流

最后,将整个流程注册为 Spring Bean 以激活:

@Bean
public IntegrationFlow fileMover() {
    return IntegrationFlow.from(sourceDirectory(), c -> c.poller(Pollers.fixedDelay(10000)))
      .filter(onlyJpgs())
      .handle(targetDirectory())
      .get();
}

get() 方法返回 IntegrationFlow 实例,Spring 容器启动时会自动初始化所有组件。

至此,应用启动后便会自动监听 /tmp/input 目录,搬运 JPG 文件到 /tmp/output

5. 其他常用组件

除了基础的输入适配器、过滤器和服务激活器,Spring Integration 还提供了丰富的扩展组件。

5.1. 消息通道(Message Channel)

消息通道用于解耦生产者与消费者,可类比为 Java 中的 Queue

可通过名称引用通道:

IntegrationFlow.from("anyChannel")

也可自定义通道,例如按文件名排序的优先级通道:

@Bean
public PriorityChannel alphabetically() {
    return new PriorityChannel(1000, (left, right) -> 
      ((File)left.getPayload()).getName().compareTo(
        ((File)right.getPayload()).getName()));
}

在流程中插入通道:

@Bean
public IntegrationFlow fileMover() {
    return IntegrationFlow.from(sourceDirectory())
      .filter(onlyJpgs())
      .channel("alphabetically")
      .handle(targetDirectory())
      .get();
}

✅ 通道用途广泛,常用于:

  • 并发处理(如 ExecutorChannel
  • 审计日志
  • 中间持久化(如对接 Kafka、JMS)

结合 Bridge 使用更灵活。

5.2. 桥接器(Bridge)

Bridge 用于连接两个通道,实现解耦和异步处理。

假设我们先将文件写入中间通道 holdingTank

@Bean
public IntegrationFlow fileReader() {
    return IntegrationFlow.from(sourceDirectory())
      .filter(onlyJpgs())
      .channel("holdingTank")
      .get();
}

再通过 Bridge 将 holdingTank 中的消息搬运到目标目录:

@Bean
public IntegrationFlow fileWriter() {
    return IntegrationFlow.from("holdingTank")
      .bridge(e -> e.poller(Pollers.fixedRate(Duration.ofSeconds(1), Duration.ofSeconds(20))))
      .handle(targetDirectory())
      .get();
}

更强大的是,可以创建多个 Bridge,以不同频率处理同一通道的消息:

@Bean
public IntegrationFlow anotherFileWriter() {
    return IntegrationFlow.from("holdingTank")
      .bridge(e -> e.poller(Pollers.fixedRate(Duration.ofSeconds(2), Duration.ofSeconds(10))))
      .handle(anotherTargetDirectory())
      .get();
}

✅ 这种模式非常适合:

  • 多消费者场景
  • 不同处理速率的需求
  • 解耦数据采集与处理逻辑

应用启动后,文件会从源目录搬运到两个不同的目标目录,且搬运速率独立控制。

6. 总结

本文通过一个文件搬运的实战案例,全面展示了 Spring Integration Java DSL 的使用方式。

✅ 核心要点回顾:

  • 使用 IntegrationFlow.from(...) 构建流水线
  • MessageSource 作为输入源,配合 poller 实现轮询
  • filterhandle 实现过滤与处理
  • 通过 channelbridge 实现复杂拓扑结构

Java DSL 让集成逻辑更清晰、更易维护,推荐在新项目中优先使用。

完整示例代码已托管至 GitHub:https://github.com/example/spring-integration-demo


原始标题:Spring Integration Java DSL