1. 简介
本文将带你掌握 Spring Integration 的 Java DSL(领域特定语言),用于构建高效的应用集成方案。
我们会复刻之前在《Spring Integration 入门》中用 XML 配置实现的“文件搬运”功能,但这次完全使用 Java DSL 实现。✅
这种方式更符合现代 Java 开发习惯,代码可读性更强,也更容易调试和维护。
2. 依赖配置
Spring Integration Java DSL 已集成在 spring-integration-core
模块中,无需额外引入 DSL 专用包。
只需添加以下核心依赖:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>6.0.0</version>
</dependency>
由于我们要实现文件搬运功能,还需引入文件模块:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
<version>6.0.0</version>
</dependency>
3. Spring Integration Java DSL 概述
在 Java DSL 出现之前,Spring Integration 主要通过 XML 配置组件,略显繁琐。
✅ Java DSL 的核心优势是提供了流畅的构建器(fluent builders),让我们能用纯 Java 代码构建完整的集成流水线。
举个简单例子:实现一个将输入字符串转为大写的通道。
传统 XML 写法:
<int:channel id="input"/>
<int:transformer input-channel="input" expression="payload.toUpperCase()" />
使用 Java DSL 后,变得简洁直观:
@Bean
public IntegrationFlow upcaseFlow() {
return IntegrationFlow.from("input")
.transform(String::toUpperCase)
.get();
}
是不是简单粗暴多了?👏
4. 文件搬运应用实战
我们从零开始构建一个监听目录、过滤 JPG 文件并自动搬运的集成流程。
4.1. 集成流(Integration Flow)
集成流是 Spring Integration 的核心构建块,通过 IntegrationFlows
构建器创建:
IntegrationFlows.from(...)
from
方法支持多种输入类型,本文重点介绍三种:
MessageSource
:消息源MessageChannel
:消息通道String
:通道名称
构建流程的基本模式如下:
IntegrationFlow flow = IntegrationFlow.from(sourceDirectory())
.filter(onlyJpgs())
.handle(targetDirectory())
// 可继续添加其他组件
.get();
⚠️ 最终通过 .get()
返回的 IntegrationFlow
实例,必须注册为 Spring Bean 才能激活。
所有 Spring Integration 应用的本质,都是“输入 → 转换 → 输出”的流水线模式。
4.2. 定义输入源(Input Source)
要搬运文件,首先得告诉系统从哪读取文件——这就是 MessageSource
的职责。
简单来说,MessageSource
是外部数据源(如文件系统、消息队列)与 Spring 消息模型之间的适配器。这类组件也常被称为 输入通道适配器(Input Channel Adapter)。
我们使用 spring-integration-file
提供的 FileReadingMessageSource
:
@Bean
public MessageSource<File> sourceDirectory() {
FileReadingMessageSource messageSource = new FileReadingMessageSource();
messageSource.setDirectory(new File("/tmp/input")); // 示例路径
return messageSource;
}
然后将其作为集成流的起点:
IntegrationFlow.from(sourceDirectory());
4.3. 配置输入源轮询策略
如果希望应用能持续监听新文件(而不是只处理启动时已存在的文件),就需要配置轮询器(poller)。
IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(10000)));
上述代码表示每 10 秒轮询一次输入目录,检测新文件。✅
这个机制适用于所有 MessageSource
,非常实用。
4.4. 消息过滤(Filter)
假设我们只想搬运 JPG 图片文件,可以使用过滤器。
先定义一个 GenericSelector
Bean:
@Bean
public GenericSelector<File> onlyJpgs() {
return new GenericSelector<File>() {
@Override
public boolean accept(File source) {
return source.getName().endsWith(".jpg");
}
};
}
在流程中使用:
IntegrationFlow.from(sourceDirectory())
.filter(onlyJpgs());
⚠️ 由于逻辑简单,更推荐使用 Lambda 写法,代码更简洁:
IntegrationFlow.from(sourceDirectory())
.filter(source -> ((File) source).getName().endsWith(".jpg"));
4.5. 使用服务激活器处理消息(Service Activator)
过滤后的文件需要写入目标目录,这时用到 服务激活器(Service Activator)。
我们使用 FileWritingMessageHandler
:
@Bean
public MessageHandler targetDirectory() {
FileWritingMessageHandler handler = new FileWritingMessageHandler(new File("/tmp/output")); // 示例路径
handler.setFileExistsMode(FileExistsMode.REPLACE);
handler.setExpectReply(false);
return handler;
}
关键参数说明:
setFileExistsMode(REPLACE)
:文件已存在时直接覆盖setExpectReply(false)
:单向通信,不期望响应 ❌(双向流才需要回复)
加入流程:
IntegrationFlow.from(sourceDirectory())
.filter(onlyJpgs())
.handle(targetDirectory());
4.6. 激活集成流
最后,将整个流程注册为 Spring Bean 以激活:
@Bean
public IntegrationFlow fileMover() {
return IntegrationFlow.from(sourceDirectory(), c -> c.poller(Pollers.fixedDelay(10000)))
.filter(onlyJpgs())
.handle(targetDirectory())
.get();
}
✅ get()
方法返回 IntegrationFlow
实例,Spring 容器启动时会自动初始化所有组件。
至此,应用启动后便会自动监听 /tmp/input
目录,搬运 JPG 文件到 /tmp/output
。
5. 其他常用组件
除了基础的输入适配器、过滤器和服务激活器,Spring Integration 还提供了丰富的扩展组件。
5.1. 消息通道(Message Channel)
消息通道用于解耦生产者与消费者,可类比为 Java 中的 Queue
。
可通过名称引用通道:
IntegrationFlow.from("anyChannel")
也可自定义通道,例如按文件名排序的优先级通道:
@Bean
public PriorityChannel alphabetically() {
return new PriorityChannel(1000, (left, right) ->
((File)left.getPayload()).getName().compareTo(
((File)right.getPayload()).getName()));
}
在流程中插入通道:
@Bean
public IntegrationFlow fileMover() {
return IntegrationFlow.from(sourceDirectory())
.filter(onlyJpgs())
.channel("alphabetically")
.handle(targetDirectory())
.get();
}
✅ 通道用途广泛,常用于:
- 并发处理(如
ExecutorChannel
) - 审计日志
- 中间持久化(如对接 Kafka、JMS)
结合 Bridge 使用更灵活。
5.2. 桥接器(Bridge)
Bridge 用于连接两个通道,实现解耦和异步处理。
假设我们先将文件写入中间通道 holdingTank
:
@Bean
public IntegrationFlow fileReader() {
return IntegrationFlow.from(sourceDirectory())
.filter(onlyJpgs())
.channel("holdingTank")
.get();
}
再通过 Bridge 将 holdingTank
中的消息搬运到目标目录:
@Bean
public IntegrationFlow fileWriter() {
return IntegrationFlow.from("holdingTank")
.bridge(e -> e.poller(Pollers.fixedRate(Duration.ofSeconds(1), Duration.ofSeconds(20))))
.handle(targetDirectory())
.get();
}
更强大的是,可以创建多个 Bridge,以不同频率处理同一通道的消息:
@Bean
public IntegrationFlow anotherFileWriter() {
return IntegrationFlow.from("holdingTank")
.bridge(e -> e.poller(Pollers.fixedRate(Duration.ofSeconds(2), Duration.ofSeconds(10))))
.handle(anotherTargetDirectory())
.get();
}
✅ 这种模式非常适合:
- 多消费者场景
- 不同处理速率的需求
- 解耦数据采集与处理逻辑
应用启动后,文件会从源目录搬运到两个不同的目标目录,且搬运速率独立控制。
6. 总结
本文通过一个文件搬运的实战案例,全面展示了 Spring Integration Java DSL 的使用方式。
✅ 核心要点回顾:
- 使用
IntegrationFlow.from(...)
构建流水线 MessageSource
作为输入源,配合poller
实现轮询filter
和handle
实现过滤与处理- 通过
channel
和bridge
实现复杂拓扑结构
Java DSL 让集成逻辑更清晰、更易维护,推荐在新项目中优先使用。
完整示例代码已托管至 GitHub:https://github.com/example/spring-integration-demo