1. 概述
本文将介绍 Apache Camel 支持的几种核心企业集成模式(EIP)。这些模式为系统间的标准化集成提供了现成的解决方案。
如果你需要先回顾 Apache Camel 的基础知识,建议先阅读这篇入门文章打好基础。
2. 关于企业集成模式(EIP)
企业集成模式是专门解决集成挑战的设计模式。Camel 实现了其中大部分模式,完整支持列表可参考官方文档。
本文重点讲解以下五种模式:
- 基于内容的路由器(Content Based Router)
- 消息转换器(Message Translator)
- 多播(Multicast)
- 分割器(Splitter)
- 死信通道(Dead Letter Channel)
在 ContentBasedFileRouter
类中,我们定义了路由逻辑:根据文件扩展名将源文件夹中的文件分发到两个不同的目标文件夹。
除了使用 Spring XML 配置,我们也可以采用 Spring Java Config 方式。需要添加以下依赖:
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot</artifactId>
<version>3.21.0</version>
</dependency>
最新版本可在Maven 仓库查询。
接着扩展 CamelConfiguration
类并覆盖 routes()
方法:
@Configuration
public class ContentBasedFileRouterConfig {
@Bean
ContentBasedFileRouter getContentBasedFileRouter() {
return new ContentBasedFileRouter();
}
public List<RouteBuilder> routes() {
return Arrays.asList(getContentBasedFileRouter());
}
}
通过 simple()
DSL 语句使用简单表达式语言评估文件扩展名。这里还用 otherwise()
处理不满足 when()
条件的消息。
3. 消息转换器
不同系统使用不同数据格式时,常需将消息转换为目标系统支持的格式。
Camel 的 MessageTranslator
路由器支持三种转换方式:
✅ 在路由逻辑中使用自定义处理器
✅ 通过特定 Bean 执行转换
✅ 使用 transform()
DSL 语句
自定义处理器的示例可参考前文,我们在处理器中为文件名添加了时间戳。
下面演示使用 transform()
的实现:
public class MessageTranslatorFileRouter extends RouteBuilder {
private static final String SOURCE_FOLDER
= "src/test/source-folder";
private static final String DESTINATION_FOLDER
= "src/test/destination-folder";
@Override
public void configure() throws Exception {
from("file://" + SOURCE_FOLDER + "?delete=true")
.transform(body().append(header(Exchange.FILE_NAME)))
.to("file://" + DESTINATION_FOLDER);
}
}
这个示例将源文件夹中每个文件的内容追加文件名,然后移动到目标文件夹。
4. 多播模式
多播允许 将同一条消息路由到多个不同接口,并分别处理。
通过 multicast()
DSL 语句实现,在语句块中列出接口和处理步骤。
⚠️ 默认情况下不同接口的处理是串行的,可通过 parallelProcessing()
改为并行。
Camel 默认使用最后一个回复作为输出消息,但可通过自定义聚合策略组装多播回复。
看个实际例子:将源文件夹的文件多播到两个路由,分别转换内容后存到不同目标文件夹。这里用 direct:
组件 连接路由:
public class MulticastFileRouter extends RouteBuilder {
private static final String SOURCE_FOLDER
= "src/test/source-folder";
private static final String DESTINATION_FOLDER_WORLD
= "src/test/destination-folder-world";
private static final String DESTINATION_FOLDER_HELLO
= "src/test/destination-folder-hello";
@Override
public void configure() throws Exception {
from("file://" + SOURCE_FOLDER + "?delete=true")
.multicast()
.to("direct:append", "direct:prepend").end();
from("direct:append")
.transform(body().append("World"))
.to("file://" + DESTINATION_FOLDER_WORLD);
from("direct:prepend")
.transform(body().prepend("Hello"))
.to("file://" + DESTINATION_FOLDER_HELLO);
}
}
5. 分割器模式
分割器允许 将消息拆分成多个片段,分别独立处理。通过 split()
DSL 语句实现。
❌ 与多播不同:分割器会修改原始消息,而多播保持原消息不变。
示例演示:将文件按行分割,每行生成独立文件,文件名等于文件内容:
public class SplitterFileRouter extends RouteBuilder {
private static final String SOURCE_FOLDER
= "src/test/source-folder";
private static final String DESTINATION_FOLDER
= "src/test/destination-folder";
@Override
public void configure() throws Exception {
from("file://" + SOURCE_FOLDER + "?delete=true")
.split(body().convertToString().tokenize("\n"))
.setHeader(Exchange.FILE_NAME, body())
.to("file://" + DESTINATION_FOLDER);
}
}
6. 死信通道
系统故障(如数据库死锁)可能导致消息传递失败。但有时延迟重试能解决问题。
死信通道控制失败消息的处理方式,可指定:
- 是否向调用方传播异常
- 失败 Exchange 的路由目标
当消息传递失败时,死信通道会将其转移到死信接口。
示例通过抛出异常演示:
public class DeadLetterChannelFileRouter extends RouteBuilder {
private static final String SOURCE_FOLDER
= "src/test/source-folder";
@Override
public void configure() throws Exception {
errorHandler(deadLetterChannel("log:dead?level=ERROR")
.maximumRedeliveries(3).redeliveryDelay(1000)
.retryAttemptedLogLevel(LoggingLevel.ERROR));
from("file://" + SOURCE_FOLDER + "?delete=true")
.process(exchange -> {
throw new IllegalArgumentException("Exception thrown!");
});
}
}
这里定义了错误处理器:
- 记录失败传递
- 设置重试策略(最多3次,间隔1秒)
- 通过
retryAttemptedLogLevel()
指定重试日志级别
需额外配置日志记录器才能完整运行。测试后控制台输出如下:
ERROR DeadLetterChannel:156 - Failed delivery for
(MessageId: ID-ZAG0025-50922-1481340325657-0-1 on
ExchangeId: ID-ZAG0025-50922-1481340325657-0-2).
On delivery attempt: 0 caught: java.lang.IllegalArgumentException:
Exception thrown!
ERROR DeadLetterChannel:156 - Failed delivery for
(MessageId: ID-ZAG0025-50922-1481340325657-0-1 on
ExchangeId: ID-ZAG0025-50922-1481340325657-0-2).
On delivery attempt: 1 caught: java.lang.IllegalArgumentException:
Exception thrown!
ERROR DeadLetterChannel:156 - Failed delivery for
(MessageId: ID-ZAG0025-50922-1481340325657-0-1 on
ExchangeId: ID-ZAG0025-50922-1481340325657-0-2).
On delivery attempt: 2 caught: java.lang.IllegalArgumentException:
Exception thrown!
ERROR DeadLetterChannel:156 - Failed delivery for
(MessageId: ID-ZAG0025-50922-1481340325657-0-1 on
ExchangeId: ID-ZAG0025-50922-1481340325657-0-2).
On delivery attempt: 3 caught: java.lang.IllegalArgumentException:
Exception thrown!
ERROR dead:156 - Exchange[ExchangePattern: InOnly,
BodyType: org.apache.camel.component.file.GenericFile,
Body: [Body is file based: GenericFile[File.txt]]]
每次重试都会记录失败的 Exchange 信息。
7. 总结
本文介绍了 Apache Camel 的集成模式,并通过示例演示了它们的用法。这些模式能有效解决常见的集成挑战。
完整代码可在GitHub 仓库获取。