1. 引言
本文将通过简洁实用的示例介绍 Spring Integration 的核心概念。Spring Integration 提供了大量强大的组件,能显著提升企业架构中系统和流程的互连能力。它融合了最优秀且最流行的设计模式,帮助开发者避免重复造轮子。
我们将探讨该库在企业应用中解决的具体需求,以及为什么它比某些替代方案更值得推荐。同时也会介绍一些能进一步简化 Spring Integration 应用开发的工具。
2. 环境配置
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>6.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
<version>6.0.0</version>
</dependency>
可从 Maven 中央仓库下载最新版本的 Spring Integration Core 和 Spring Integration File Support。
3. 消息模式
该库的基础模式之一是消息模式。该模式以消息为中心——即通过预定义通道在源系统/进程与一个或多个目标系统/进程间传递的离散数据负载。
从历史看,该模式作为集成多个异构系统的最灵活方式应运而生,其优势在于:
- ✅ 几乎完全解耦参与集成的系统
- ✅ 允许参与者完全忽略彼此的底层协议、数据格式等实现细节
- ✅ 鼓励集成组件的开发和复用
4. 消息集成实战
来看一个基础示例:将 MPEG 视频文件从指定文件夹复制到配置的目标文件夹:
@Configuration
@EnableIntegration
public class BasicIntegrationConfig{
public String INPUT_DIR = "the_source_dir";
public String OUTPUT_DIR = "the_dest_dir";
public String FILE_PATTERN = "*.mpeg";
@Bean
public MessageChannel fileChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource sourceReader= new FileReadingMessageSource();
sourceReader.setDirectory(new File(INPUT_DIR));
sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
return sourceReader;
}
@Bean
@ServiceActivator(inputChannel= "fileChannel")
public MessageHandler fileWritingMessageHandler() {
FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR));
handler.setFileExistsMode(FileExistsMode.REPLACE);
handler.setExpectReply(false);
return handler;
}
}
上述代码配置了服务激活器、集成通道和入站通道适配器。@EnableIntegration
注解标记该类为 Spring Integration 配置类。
启动 Spring Integration 应用上下文:
public static void main(String... args) {
AbstractApplicationContext context
= new AnnotationConfigApplicationContext(BasicIntegrationConfig.class);
context.registerShutdownHook();
Scanner scanner = new Scanner(System.in);
System.out.print("请输入 q 并按回车键退出程序: ");
while (true) {
String input = scanner.nextLine();
if("q".equals(input.trim())) {
break;
}
}
System.exit(0);
}
主方法启动集成上下文,并接受命令行输入 "q" 退出程序。下面详细分析各组件。
5. Spring Integration 核心组件
5.1. 消息
org.springframework.integration.Message
接口定义了 Spring 消息——即 Spring Integration 上下文中的数据传输单元:
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
提供两个关键元素的访问器:
- 消息头:本质是键值容器,用于传输元数据(由
org.springframework.integration.MessageHeaders
类定义) - 消息负载:实际需要传输的有价值数据——本例中视频文件就是负载
5.2. 通道
Spring Integration(及整个 EAI)中的通道是集成架构的基础管道。它是消息在系统间传递的管道。
可将其视为字面意义的管道,集成系统或进程可通过它向其他系统推送(或接收)消息。
Spring Integration 提供多种通道类型,根据需求选择。它们大多可直接配置使用,无需自定义代码,但若有特殊需求,也提供了健壮的框架支持。
点对点(P2P)通道用于建立系统/组件间的 1 对 1 通信。一个组件向通道发布消息,另一个组件接收。通道两端只能各有一个组件。
配置通道只需返回 DirectChannel
实例:
@Bean
public MessageChannel fileChannel1() {
return new DirectChannel();
}
@Bean
public MessageChannel fileChannel2() {
return new DirectChannel();
}
@Bean
public MessageChannel fileChannel3() {
return new DirectChannel();
}
这里定义了三个独立通道,通过各自的 getter 方法名标识。
发布-订阅(Pub-Sub)通道用于建立系统/组件间的 1 对多通信。这允许我们向之前创建的三个点对点通道同时发布消息。
将示例中的 P2P 通道替换为 Pub-Sub 通道:
@Bean
public MessageChannel pubSubFileChannel() {
return new PublishSubscribeChannel();
}
@Bean
@InboundChannelAdapter(value = "pubSubFileChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource sourceReader = new FileReadingMessageSource();
sourceReader.setDirectory(new File(INPUT_DIR));
sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
return sourceReader;
}
现在入站通道适配器改为向 Pub-Sub 通道发布。这样就能将从源文件夹读取的文件发送到多个目标。
5.3. 桥接器
当两个消息通道或适配器无法直接连接时,Spring Integration 的桥接器用于建立连接。
本例中,我们用桥接器将 Pub-Sub 通道连接到三个不同的 P2P 通道(因为 P2P 和 Pub-Sub 通道不能直接连接):
@Bean
@BridgeFrom(value = "pubSubFileChannel")
public MessageChannel fileChannel1() {
return new DirectChannel();
}
@Bean
@BridgeFrom(value = "pubSubFileChannel")
public MessageChannel fileChannel2() {
return new DirectChannel();
}
@Bean
@BridgeFrom(value = "pubSubFileChannel")
public MessageChannel fileChannel3() {
return new DirectChannel();
}
上述配置将 pubSubFileChannel
桥接到三个 P2P 通道。@BridgeFrom
注解定义桥接关系,可应用于任意数量的订阅通道。
代码可解读为:"创建从 pubSubFileChannel
到 fileChannel1
、fileChannel2
和 fileChannel3
的桥接,使 pubSubFileChannel
的消息能同时传递到所有三个通道"。
5.4. 服务激活器
服务激活器是在方法上标注 @ServiceActivator
注解的任意 POJO。当从入站通道收到消息时,可执行 POJO 中的任意方法,并允许将消息写入出站通道。
示例中,服务激活器从配置的 input channel
接收文件,并将其写入配置的文件夹。
5.5. 适配器
适配器是基于企业集成模式的组件,允许"插入"系统或数据源。它就像我们熟知的电源Socket或电子设备适配器。
它提供了对数据库、FTP 服务器、JMS/AMQP 等消息系统、Twitter 等社交网络等"黑盒"系统的可复用连接能力。由于连接这些系统的需求普遍存在,适配器具有高度可移植性和可复用性(实际上有一个小型适配器目录,可免费使用)。
适配器分为两大类——入站和出站。
结合示例场景分析这两类:
入站适配器用于从外部系统(本例中为文件系统目录)获取消息。
入站适配器配置包含:
-
@InboundChannelAdapter
注解:标记 bean 配置为适配器——配置适配器发送消息的通道(本例为 MPEG 文件)和轮询器(组件按指定间隔轮询配置文件夹) - 标准 Spring Java 配置:返回
FileReadingMessageSource
(处理文件系统轮询的 Spring Integration 实现类)
出站适配器用于向外发送消息。Spring Integration 为各种常见用例提供了大量现成的出站适配器。
6. 总结
我们通过 Spring Integration 的基础用例,展示了该库的 Java 配置方式和组件的可复用性。
Spring Integration 代码既可作为独立项目部署在 JavaSE 环境,也可作为 Jakarta EE 环境中大型应用的一部分。虽然它不直接与企业服务总线(ESB)等 EAI 产品竞争,但为解决许多同类问题提供了一个轻量级的可行替代方案。