1. 概述

本文将介绍 Apache Camel 支持的几种核心企业集成模式(EIP)。这些模式为系统间的标准化集成提供了现成的解决方案。

如果你需要先回顾 Apache Camel 的基础知识,建议先阅读这篇入门文章打好基础。

2. 关于企业集成模式(EIP)

企业集成模式是专门解决集成挑战的设计模式。Camel 实现了其中大部分模式,完整支持列表可参考官方文档

本文重点讲解以下五种模式:

  • 基于内容的路由器(Content Based Router)
  • 消息转换器(Message Translator)
  • 多播(Multicast)
  • 分割器(Splitter)
  • 死信通道(Dead Letter Channel)

ContentBasedFileRouter 类中,我们定义了路由逻辑:根据文件扩展名将源文件夹中的文件分发到两个不同的目标文件夹。

除了使用 Spring XML 配置,我们也可以采用 Spring Java Config 方式。需要添加以下依赖:

<dependency>    
    <groupId>org.apache.camel.springboot</groupId>    
    <artifactId>camel-spring-boot</artifactId>    
    <version>3.21.0</version>    
</dependency>

最新版本可在Maven 仓库查询。

接着扩展 CamelConfiguration 类并覆盖 routes() 方法:

@Configuration
public class ContentBasedFileRouterConfig {

    @Bean
    ContentBasedFileRouter getContentBasedFileRouter() {
        return new ContentBasedFileRouter();
    }

    public List<RouteBuilder> routes() {
        return Arrays.asList(getContentBasedFileRouter());
    }
}

通过 simple() DSL 语句使用简单表达式语言评估文件扩展名。这里还用 otherwise() 处理不满足 when() 条件的消息。

3. 消息转换器

不同系统使用不同数据格式时,常需将消息转换为目标系统支持的格式。

Camel 的 MessageTranslator 路由器支持三种转换方式: ✅ 在路由逻辑中使用自定义处理器 ✅ 通过特定 Bean 执行转换 ✅ 使用 transform() DSL 语句

自定义处理器的示例可参考前文,我们在处理器中为文件名添加了时间戳。

下面演示使用 transform() 的实现:

public class MessageTranslatorFileRouter extends RouteBuilder {
    private static final String SOURCE_FOLDER 
      = "src/test/source-folder";
    private static final String DESTINATION_FOLDER 
      = "src/test/destination-folder";

    @Override
    public void configure() throws Exception {
        from("file://" + SOURCE_FOLDER + "?delete=true")
          .transform(body().append(header(Exchange.FILE_NAME)))
          .to("file://" + DESTINATION_FOLDER);
    }
}

这个示例将源文件夹中每个文件的内容追加文件名,然后移动到目标文件夹。

4. 多播模式

多播允许 将同一条消息路由到多个不同接口,并分别处理

通过 multicast() DSL 语句实现,在语句块中列出接口和处理步骤。

⚠️ 默认情况下不同接口的处理是串行的,可通过 parallelProcessing() 改为并行。

Camel 默认使用最后一个回复作为输出消息,但可通过自定义聚合策略组装多播回复。

看个实际例子:将源文件夹的文件多播到两个路由,分别转换内容后存到不同目标文件夹。这里用 direct: 组件 连接路由:

public class MulticastFileRouter extends RouteBuilder {
    private static final String SOURCE_FOLDER 
      = "src/test/source-folder";
    private static final String DESTINATION_FOLDER_WORLD 
      = "src/test/destination-folder-world";
    private static final String DESTINATION_FOLDER_HELLO 
      = "src/test/destination-folder-hello";

    @Override
    public void configure() throws Exception {
        from("file://" + SOURCE_FOLDER + "?delete=true")
          .multicast()
          .to("direct:append", "direct:prepend").end();

        from("direct:append")
          .transform(body().append("World"))
          .to("file://" + DESTINATION_FOLDER_WORLD);

        from("direct:prepend")
           .transform(body().prepend("Hello"))
           .to("file://" + DESTINATION_FOLDER_HELLO);
    }
}

5. 分割器模式

分割器允许 将消息拆分成多个片段,分别独立处理。通过 split() DSL 语句实现。

❌ 与多播不同:分割器会修改原始消息,而多播保持原消息不变。

示例演示:将文件按行分割,每行生成独立文件,文件名等于文件内容:

public class SplitterFileRouter extends RouteBuilder {
    private static final String SOURCE_FOLDER 
      = "src/test/source-folder";
    private static final String DESTINATION_FOLDER  
      = "src/test/destination-folder";

    @Override
    public void configure() throws Exception {
        from("file://" + SOURCE_FOLDER + "?delete=true")
          .split(body().convertToString().tokenize("\n"))
          .setHeader(Exchange.FILE_NAME, body())
          .to("file://" + DESTINATION_FOLDER);
    }
}

6. 死信通道

系统故障(如数据库死锁)可能导致消息传递失败。但有时延迟重试能解决问题。

死信通道控制失败消息的处理方式,可指定:

  • 是否向调用方传播异常
  • 失败 Exchange 的路由目标

当消息传递失败时,死信通道会将其转移到死信接口。

示例通过抛出异常演示:

public class DeadLetterChannelFileRouter extends RouteBuilder {
    private static final String SOURCE_FOLDER 
      = "src/test/source-folder";

    @Override
    public void configure() throws Exception {
        errorHandler(deadLetterChannel("log:dead?level=ERROR")
          .maximumRedeliveries(3).redeliveryDelay(1000)
          .retryAttemptedLogLevel(LoggingLevel.ERROR));

        from("file://" + SOURCE_FOLDER + "?delete=true")
          .process(exchange -> {
            throw new IllegalArgumentException("Exception thrown!");
        });
    }
}

这里定义了错误处理器:

  • 记录失败传递
  • 设置重试策略(最多3次,间隔1秒)
  • 通过 retryAttemptedLogLevel() 指定重试日志级别

需额外配置日志记录器才能完整运行。测试后控制台输出如下:

ERROR DeadLetterChannel:156 - Failed delivery for 
(MessageId: ID-ZAG0025-50922-1481340325657-0-1 on 
ExchangeId: ID-ZAG0025-50922-1481340325657-0-2). 
On delivery attempt: 0 caught: java.lang.IllegalArgumentException: 
Exception thrown!
ERROR DeadLetterChannel:156 - Failed delivery for 
(MessageId: ID-ZAG0025-50922-1481340325657-0-1 on 
ExchangeId: ID-ZAG0025-50922-1481340325657-0-2). 
On delivery attempt: 1 caught: java.lang.IllegalArgumentException: 
Exception thrown!
ERROR DeadLetterChannel:156 - Failed delivery for 
(MessageId: ID-ZAG0025-50922-1481340325657-0-1 on 
ExchangeId: ID-ZAG0025-50922-1481340325657-0-2). 
On delivery attempt: 2 caught: java.lang.IllegalArgumentException: 
Exception thrown!
ERROR DeadLetterChannel:156 - Failed delivery for 
(MessageId: ID-ZAG0025-50922-1481340325657-0-1 on 
ExchangeId: ID-ZAG0025-50922-1481340325657-0-2). 
On delivery attempt: 3 caught: java.lang.IllegalArgumentException: 
Exception thrown!
ERROR dead:156 - Exchange[ExchangePattern: InOnly, 
BodyType: org.apache.camel.component.file.GenericFile, 
Body: [Body is file based: GenericFile[File.txt]]]

每次重试都会记录失败的 Exchange 信息。

7. 总结

本文介绍了 Apache Camel 的集成模式,并通过示例演示了它们的用法。这些模式能有效解决常见的集成挑战。

完整代码可在GitHub 仓库获取。


原始标题:Integration Patterns With Apache Camel