1. 概述

Spring Integration 提供了对多种企业集成模式(Enterprise Integration Patterns)的支持,其中一种方式就是通过 DSL 来简化配置。

在本篇文章中,我们将重点介绍如何使用 DSL 中的 子流(Subflow) 来优化配置流程。

2. 我们的需求

假设我们有一组整数序列,需要将它们按照模 3 的结果分为三类:

  • 余数为 0 的数字(如 0, 3, 6, 9)进入 multipleOfThreeChannel
  • 余数为 1 的数字(如 1, 4, 7, 10)进入 remainderIsOneChannel
  • 余数为 2 的数字(如 2, 5, 8, 11)进入 remainderIsTwoChannel

为了展示子流的优势,我们先看看不使用子流时的写法,然后再用子流进行简化。我们会使用以下组件:

  • publishSubscribeChannel
  • routeToRecipients
  • Filter 实现 if-then 逻辑
  • Router 实现 switch 逻辑

3. 前置准备

在定义子流之前,我们先创建三个输出通道。为了方便演示,使用 QueueChannel

@EnableIntegration
@IntegrationComponentScan
public class SubflowsConfiguration {
 
    @Bean
    QueueChannel multipleOfThreeChannel() {
        return new QueueChannel();
    }

    @Bean
    QueueChannel remainderIsOneChannel() {
        return new QueueChannel();
    }

    @Bean
    QueueChannel remainderIsTwoChannel() {
        return new QueueChannel();
    }

    boolean isMultipleOfThree(Integer number) {
       return number % 3 == 0;
    }

    boolean isRemainderOne(Integer number) {
        return number % 3 == 1;
    }

    boolean isRemainderTwo(Integer number) {
        return number % 3 == 2;
    }
}

这些通道最终会接收被分类后的数字。

同时,为了代码可读性,我们定义了几个辅助方法。

4. 不使用子流的做法

4.1. 定义多个 IntegrationFlow 组件

不使用子流的情况下,我们需要为每种分类分别定义一个 IntegrationFlow

@Bean
public IntegrationFlow multipleOfThreeFlow() {
    return flow -> flow.split()
      .<Integer> filter(this::isMultipleOfThree)
      .channel("multipleOfThreeChannel");
}

这段流程包含两个组件:一个 Splitter 和一个 Filter。Splitter 的作用是将输入的集合拆分成多个独立消息,而 Filter 则筛选出符合条件的消息。

类似地,我们可以再定义两个 Flow。

4.2. 使用 Messaging Gateway

每个 Flow 都需要一个 @MessagingGateway 来作为入口:

@MessagingGateway
public interface NumbersClassifier {

    @Gateway(requestChannel = "multipleOfThreeFlow.input")
    void multipleOfThree(Collection<Integer> numbers);

    @Gateway(requestChannel = "remainderIsOneFlow.input")
    void remainderIsOne(Collection<Integer> numbers);

    @Gateway(requestChannel = "remainderIsTwoFlow.input")
    void remainderIsTwo(Collection<Integer> numbers);
}

每个方法通过 @Gateway 注解绑定到对应的 Flow 输入通道。

4.3. 发送消息并验证输出

测试代码如下:

@Test
public void whenSendMessagesToMultipleOf3Flow_thenOutputMultiplesOf3() {
    numbersClassifier.multipleOfThree(Arrays.asList(1, 2, 3, 4, 5, 6));
    Message<?> outMessage = multipleOfThreeChannel.receive(0);
    assertEquals(outMessage.getPayload(), 3);
    outMessage = multipleOfThreeChannel.receive(0);
    assertEquals(outMessage.getPayload(), 6);
    outMessage = multipleOfThreeChannel.receive(0);
    assertNull(outMessage);
}

✅ 这里我们发送的是一个 List,所以需要 Splitter 拆分。

⚠️ 注意:这种方式需要维护三个 Flow 和三个 Gateway 方法,比较啰嗦。

5. 使用 publishSubscribeChannel

使用 publishSubscribeChannel() 可以将消息广播给多个订阅者(subflow),这样我们只需要一个 Flow:

@Bean
public IntegrationFlow classify() {
    return flow -> flow.split()
        .publishSubscribeChannel(subscription -> 
           subscription
             .subscribe(subflow -> subflow
               .<Integer> filter(this::isMultipleOfThree)
               .channel("multipleOfThreeChannel"))
             .subscribe(subflow -> subflow
                .<Integer> filter(this::isRemainderOne)
                .channel("remainderIsOneChannel"))
             .subscribe(subflow -> subflow
                .<Integer> filter(this::isRemainderTwo)
                .channel("remainderIsTwoChannel")));
}

✅ 子流是匿名的,不能独立访问。

更新 Gateway:

@Gateway(requestChannel = "classify.input")
void classify(Collection<Integer> numbers);

现在我们只需要一个方法即可完成所有分类:

@Test
public void whenSendMessagesToFlow_thenNumbersAreClassified() {
    numbersClassifier.classify(Arrays.asList(1, 2, 3, 4, 5, 6));
    // 后续验证逻辑同上
}

6. 使用 routeToRecipients

另一种方式是使用 routeToRecipients,它支持内置过滤逻辑。

6.1. 使用 recipient

@Bean
public IntegrationFlow classify() {
    return flow -> flow.split()
        .routeToRecipients(route -> route
          .<Integer> recipient("multipleOfThreeChannel", 
            this::isMultipleOfThree)       
          .<Integer> recipient("remainderIsOneChannel", 
            this::isRemainderOne)
          .<Integer> recipient("remainderIsTwoChannel", 
            this::isRemainderTwo));
}

✅ 可以根据条件动态选择接收通道。

6.2. 使用 recipientFlow

也可以像 publishSubscribeChannel 一样定义完整子流:

.routeToRecipients(route -> route
  .recipientFlow(subflow -> subflow
      .<Integer> filter(this::isMultipleOfThree)
      .channel("multipleOfThreeChannel"))
  ...);

7. 使用 if-then 逻辑(Filter)

Filter 不仅可以定义通过条件,还可以定义被丢弃的消息去向:

@Bean
public IntegrationFlow classify() {
    return flow -> flow.split()
        .<Integer> filter(this::isMultipleOfThree, 
           notMultiple -> notMultiple
             .discardFlow(oneflow -> oneflow
               .<Integer> filter(this::isRemainderOne,
                 twoflow -> twoflow
                   .discardChannel("remainderIsTwoChannel"))
               .channel("remainderIsOneChannel"))
        .channel("multipleOfThreeChannel");
}

✅ 可以将 discard 流看作 else 分支。

8. 使用 switch 逻辑(Router)

8.1. 使用 channelMapping

@Bean
public IntegrationFlow classify() {
    return classify -> classify.split()
      .<Integer, Integer> route(number -> number % 3, 
        mapping -> mapping
         .channelMapping(0, "multipleOfThreeChannel")
         .channelMapping(1, "remainderIsOneChannel")
         .channelMapping(2, "remainderIsTwoChannel"));
}

8.2. 使用 subFlowMapping

.subFlowMapping(1, subflow -> subflow.channel("remainderIsOneChannel"))

或者使用 handle 方法:

.subFlowMapping(2, subflow -> subflow
  .<Integer> handle((payload, headers) -> {
      // do extra work on the payload
     return payload;
  }))).channel("remainderIsTwoChannel");

9. 总结

在这篇文章中,我们通过多个示例展示了如何使用子流来简化 Spring Integration 的配置流程。子流不仅减少了重复代码,还提高了流程的可读性和可维护性。

✅ 推荐使用子流来简化复杂的路由和过滤逻辑。

完整代码可在 GitHub 获取。


原始标题:Using Subflows in Spring Integration