1. 引言

本文将通过简洁实用的示例介绍 Spring Integration 的核心概念。Spring Integration 提供了大量强大的组件,能显著提升企业架构中系统和流程的互连能力。它融合了最优秀且最流行的设计模式,帮助开发者避免重复造轮子。

我们将探讨该库在企业应用中解决的具体需求,以及为什么它比某些替代方案更值得推荐。同时也会介绍一些能进一步简化 Spring Integration 应用开发的工具。

2. 环境配置

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
    <version>6.0.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-file</artifactId>
    <version>6.0.0</version>
</dependency>

可从 Maven 中央仓库下载最新版本的 Spring Integration CoreSpring Integration File Support

3. 消息模式

该库的基础模式之一是消息模式。该模式以消息为中心——即通过预定义通道在源系统/进程与一个或多个目标系统/进程间传递的离散数据负载。

从历史看,该模式作为集成多个异构系统的最灵活方式应运而生,其优势在于:

  • 几乎完全解耦参与集成的系统
  • 允许参与者完全忽略彼此的底层协议、数据格式等实现细节
  • 鼓励集成组件的开发和复用

4. 消息集成实战

来看一个基础示例:将 MPEG 视频文件从指定文件夹复制到配置的目标文件夹:

@Configuration
@EnableIntegration
public class BasicIntegrationConfig{
    public String INPUT_DIR = "the_source_dir";
    public String OUTPUT_DIR = "the_dest_dir";
    public String FILE_PATTERN = "*.mpeg";

    @Bean
    public MessageChannel fileChannel() {
        return new DirectChannel();
    }

    @Bean
    @InboundChannelAdapter(value = "fileChannel", poller = @Poller(fixedDelay = "1000"))
    public MessageSource<File> fileReadingMessageSource() {
        FileReadingMessageSource sourceReader= new FileReadingMessageSource();
        sourceReader.setDirectory(new File(INPUT_DIR));
        sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
        return sourceReader;
    }

    @Bean
    @ServiceActivator(inputChannel= "fileChannel")
    public MessageHandler fileWritingMessageHandler() {
        FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR));
        handler.setFileExistsMode(FileExistsMode.REPLACE);
        handler.setExpectReply(false);
        return handler;
    }
}

上述代码配置了服务激活器、集成通道和入站通道适配器。@EnableIntegration 注解标记该类为 Spring Integration 配置类。

启动 Spring Integration 应用上下文:

public static void main(String... args) {
    AbstractApplicationContext context 
      = new AnnotationConfigApplicationContext(BasicIntegrationConfig.class);
    context.registerShutdownHook();
    
    Scanner scanner = new Scanner(System.in);
    System.out.print("请输入 q 并按回车键退出程序: ");
    
    while (true) {
       String input = scanner.nextLine();
       if("q".equals(input.trim())) {
          break;
      }
    }
    System.exit(0);
}

主方法启动集成上下文,并接受命令行输入 "q" 退出程序。下面详细分析各组件。

5. Spring Integration 核心组件

5.1. 消息

org.springframework.integration.Message 接口定义了 Spring 消息——即 Spring Integration 上下文中的数据传输单元:

public interface Message<T> {
    T getPayload();
    MessageHeaders getHeaders();
}

提供两个关键元素的访问器:

  • 消息头:本质是键值容器,用于传输元数据(由 org.springframework.integration.MessageHeaders 类定义)
  • 消息负载:实际需要传输的有价值数据——本例中视频文件就是负载

5.2. 通道

Spring Integration(及整个 EAI)中的通道是集成架构的基础管道。它是消息在系统间传递的管道。

可将其视为字面意义的管道,集成系统或进程可通过它向其他系统推送(或接收)消息。

Spring Integration 提供多种通道类型,根据需求选择。它们大多可直接配置使用,无需自定义代码,但若有特殊需求,也提供了健壮的框架支持。

点对点(P2P)通道用于建立系统/组件间的 1 对 1 通信。一个组件向通道发布消息,另一个组件接收。通道两端只能各有一个组件。

配置通道只需返回 DirectChannel 实例:

@Bean
public MessageChannel fileChannel1() {
    return new DirectChannel();
}

@Bean
public MessageChannel fileChannel2() {
    return new DirectChannel();
}

@Bean
public MessageChannel fileChannel3() {
    return new DirectChannel();
}

这里定义了三个独立通道,通过各自的 getter 方法名标识。

发布-订阅(Pub-Sub)通道用于建立系统/组件间的 1 对多通信。这允许我们向之前创建的三个点对点通道同时发布消息。

将示例中的 P2P 通道替换为 Pub-Sub 通道:

@Bean
public MessageChannel pubSubFileChannel() {
    return new PublishSubscribeChannel();
}

@Bean
@InboundChannelAdapter(value = "pubSubFileChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
    FileReadingMessageSource sourceReader = new FileReadingMessageSource();
    sourceReader.setDirectory(new File(INPUT_DIR));
    sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
    return sourceReader;
}

现在入站通道适配器改为向 Pub-Sub 通道发布。这样就能将从源文件夹读取的文件发送到多个目标。

5.3. 桥接器

当两个消息通道或适配器无法直接连接时,Spring Integration 的桥接器用于建立连接。

本例中,我们用桥接器将 Pub-Sub 通道连接到三个不同的 P2P 通道(因为 P2P 和 Pub-Sub 通道不能直接连接):

@Bean
@BridgeFrom(value = "pubSubFileChannel")
public MessageChannel fileChannel1() {
    return new DirectChannel();
}

@Bean
@BridgeFrom(value = "pubSubFileChannel")
public MessageChannel fileChannel2() {
    return new DirectChannel();
}

@Bean
@BridgeFrom(value = "pubSubFileChannel")
public MessageChannel fileChannel3() {
    return new DirectChannel();
}

上述配置将 pubSubFileChannel 桥接到三个 P2P 通道。@BridgeFrom 注解定义桥接关系,可应用于任意数量的订阅通道。

代码可解读为:"创建从 pubSubFileChannelfileChannel1fileChannel2fileChannel3 的桥接,使 pubSubFileChannel 的消息能同时传递到所有三个通道"。

5.4. 服务激活器

服务激活器是在方法上标注 @ServiceActivator 注解的任意 POJO。当从入站通道收到消息时,可执行 POJO 中的任意方法,并允许将消息写入出站通道。

示例中,服务激活器从配置的 input channel 接收文件,并将其写入配置的文件夹。

5.5. 适配器

适配器是基于企业集成模式的组件,允许"插入"系统或数据源。它就像我们熟知的电源Socket或电子设备适配器。

它提供了对数据库、FTP 服务器、JMS/AMQP 等消息系统、Twitter 等社交网络等"黑盒"系统的可复用连接能力。由于连接这些系统的需求普遍存在,适配器具有高度可移植性和可复用性(实际上有一个小型适配器目录,可免费使用)。

适配器分为两大类——入站和出站。

结合示例场景分析这两类:

入站适配器用于从外部系统(本例中为文件系统目录)获取消息。

入站适配器配置包含:

  • @InboundChannelAdapter 注解:标记 bean 配置为适配器——配置适配器发送消息的通道(本例为 MPEG 文件)和轮询器(组件按指定间隔轮询配置文件夹)
  • 标准 Spring Java 配置:返回 FileReadingMessageSource(处理文件系统轮询的 Spring Integration 实现类)

出站适配器用于向外发送消息。Spring Integration 为各种常见用例提供了大量现成的出站适配器。

6. 总结

我们通过 Spring Integration 的基础用例,展示了该库的 Java 配置方式和组件的可复用性。

Spring Integration 代码既可作为独立项目部署在 JavaSE 环境,也可作为 Jakarta EE 环境中大型应用的一部分。虽然它不直接与企业服务总线(ESB)等 EAI 产品竞争,但为解决许多同类问题提供了一个轻量级的可行替代方案。


原始标题:Introduction to Spring Integration | Baeldung