1. 概述
Spring Integration 提供了对多种企业集成模式(Enterprise Integration Patterns)的支持,其中一种方式就是通过 DSL 来简化配置。
在本篇文章中,我们将重点介绍如何使用 DSL 中的 子流(Subflow) 来优化配置流程。
2. 我们的需求
假设我们有一组整数序列,需要将它们按照模 3 的结果分为三类:
- 余数为 0 的数字(如 0, 3, 6, 9)进入
multipleOfThreeChannel
- 余数为 1 的数字(如 1, 4, 7, 10)进入
remainderIsOneChannel
- 余数为 2 的数字(如 2, 5, 8, 11)进入
remainderIsTwoChannel
为了展示子流的优势,我们先看看不使用子流时的写法,然后再用子流进行简化。我们会使用以下组件:
publishSubscribeChannel
routeToRecipients
Filter
实现 if-then 逻辑Router
实现 switch 逻辑
3. 前置准备
在定义子流之前,我们先创建三个输出通道。为了方便演示,使用 QueueChannel
:
@EnableIntegration
@IntegrationComponentScan
public class SubflowsConfiguration {
@Bean
QueueChannel multipleOfThreeChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsOneChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsTwoChannel() {
return new QueueChannel();
}
boolean isMultipleOfThree(Integer number) {
return number % 3 == 0;
}
boolean isRemainderOne(Integer number) {
return number % 3 == 1;
}
boolean isRemainderTwo(Integer number) {
return number % 3 == 2;
}
}
这些通道最终会接收被分类后的数字。
同时,为了代码可读性,我们定义了几个辅助方法。
4. 不使用子流的做法
4.1. 定义多个 IntegrationFlow
组件
不使用子流的情况下,我们需要为每种分类分别定义一个 IntegrationFlow
:
@Bean
public IntegrationFlow multipleOfThreeFlow() {
return flow -> flow.split()
.<Integer> filter(this::isMultipleOfThree)
.channel("multipleOfThreeChannel");
}
这段流程包含两个组件:一个 Splitter
和一个 Filter
。Splitter 的作用是将输入的集合拆分成多个独立消息,而 Filter 则筛选出符合条件的消息。
类似地,我们可以再定义两个 Flow。
4.2. 使用 Messaging Gateway
每个 Flow 都需要一个 @MessagingGateway
来作为入口:
@MessagingGateway
public interface NumbersClassifier {
@Gateway(requestChannel = "multipleOfThreeFlow.input")
void multipleOfThree(Collection<Integer> numbers);
@Gateway(requestChannel = "remainderIsOneFlow.input")
void remainderIsOne(Collection<Integer> numbers);
@Gateway(requestChannel = "remainderIsTwoFlow.input")
void remainderIsTwo(Collection<Integer> numbers);
}
每个方法通过 @Gateway
注解绑定到对应的 Flow 输入通道。
4.3. 发送消息并验证输出
测试代码如下:
@Test
public void whenSendMessagesToMultipleOf3Flow_thenOutputMultiplesOf3() {
numbersClassifier.multipleOfThree(Arrays.asList(1, 2, 3, 4, 5, 6));
Message<?> outMessage = multipleOfThreeChannel.receive(0);
assertEquals(outMessage.getPayload(), 3);
outMessage = multipleOfThreeChannel.receive(0);
assertEquals(outMessage.getPayload(), 6);
outMessage = multipleOfThreeChannel.receive(0);
assertNull(outMessage);
}
✅ 这里我们发送的是一个 List
,所以需要 Splitter 拆分。
⚠️ 注意:这种方式需要维护三个 Flow 和三个 Gateway 方法,比较啰嗦。
5. 使用 publishSubscribeChannel
使用 publishSubscribeChannel()
可以将消息广播给多个订阅者(subflow),这样我们只需要一个 Flow:
@Bean
public IntegrationFlow classify() {
return flow -> flow.split()
.publishSubscribeChannel(subscription ->
subscription
.subscribe(subflow -> subflow
.<Integer> filter(this::isMultipleOfThree)
.channel("multipleOfThreeChannel"))
.subscribe(subflow -> subflow
.<Integer> filter(this::isRemainderOne)
.channel("remainderIsOneChannel"))
.subscribe(subflow -> subflow
.<Integer> filter(this::isRemainderTwo)
.channel("remainderIsTwoChannel")));
}
✅ 子流是匿名的,不能独立访问。
更新 Gateway:
@Gateway(requestChannel = "classify.input")
void classify(Collection<Integer> numbers);
现在我们只需要一个方法即可完成所有分类:
@Test
public void whenSendMessagesToFlow_thenNumbersAreClassified() {
numbersClassifier.classify(Arrays.asList(1, 2, 3, 4, 5, 6));
// 后续验证逻辑同上
}
6. 使用 routeToRecipients
另一种方式是使用 routeToRecipients
,它支持内置过滤逻辑。
6.1. 使用 recipient
@Bean
public IntegrationFlow classify() {
return flow -> flow.split()
.routeToRecipients(route -> route
.<Integer> recipient("multipleOfThreeChannel",
this::isMultipleOfThree)
.<Integer> recipient("remainderIsOneChannel",
this::isRemainderOne)
.<Integer> recipient("remainderIsTwoChannel",
this::isRemainderTwo));
}
✅ 可以根据条件动态选择接收通道。
6.2. 使用 recipientFlow
也可以像 publishSubscribeChannel
一样定义完整子流:
.routeToRecipients(route -> route
.recipientFlow(subflow -> subflow
.<Integer> filter(this::isMultipleOfThree)
.channel("multipleOfThreeChannel"))
...);
7. 使用 if-then
逻辑(Filter)
Filter 不仅可以定义通过条件,还可以定义被丢弃的消息去向:
@Bean
public IntegrationFlow classify() {
return flow -> flow.split()
.<Integer> filter(this::isMultipleOfThree,
notMultiple -> notMultiple
.discardFlow(oneflow -> oneflow
.<Integer> filter(this::isRemainderOne,
twoflow -> twoflow
.discardChannel("remainderIsTwoChannel"))
.channel("remainderIsOneChannel"))
.channel("multipleOfThreeChannel");
}
✅ 可以将 discard 流看作 else
分支。
8. 使用 switch
逻辑(Router)
8.1. 使用 channelMapping
@Bean
public IntegrationFlow classify() {
return classify -> classify.split()
.<Integer, Integer> route(number -> number % 3,
mapping -> mapping
.channelMapping(0, "multipleOfThreeChannel")
.channelMapping(1, "remainderIsOneChannel")
.channelMapping(2, "remainderIsTwoChannel"));
}
8.2. 使用 subFlowMapping
.subFlowMapping(1, subflow -> subflow.channel("remainderIsOneChannel"))
或者使用 handle
方法:
.subFlowMapping(2, subflow -> subflow
.<Integer> handle((payload, headers) -> {
// do extra work on the payload
return payload;
}))).channel("remainderIsTwoChannel");
9. 总结
在这篇文章中,我们通过多个示例展示了如何使用子流来简化 Spring Integration 的配置流程。子流不仅减少了重复代码,还提高了流程的可读性和可维护性。
✅ 推荐使用子流来简化复杂的路由和过滤逻辑。
完整代码可在 GitHub 获取。