1. 引言

本文将介绍 Dapr 是什么、如何与 Spring Boot 集成,以及如何创建不依赖特定消息代理的发布/订阅系统。我们将通过一个网约车场景来演示:用户发起乘车请求,驱动订阅这些请求。最终,我们将实现无需 Dapr CLI 或外部基础设施即可运行的测试。

2. 使用 Dapr 实现基础设施无关架构

分布式系统通常面临一些常见但复杂的挑战。我们通常使用特定供应商的库、基础设施工具和手动集成工作来解决这些问题。

Dapr(分布式应用运行时)提供了一套 API 和构建块来解决这些挑战,它抽象了基础设施细节,让我们能专注于业务逻辑。这些原则同样适用于其他场景,比如调用其他服务(通过服务调用 API)、持久化状态(通过状态管理 API)或获取密钥(通过密钥 API)。

这种解耦使应用更易于测试、跨环境更易移植,且对基础设施变更更具弹性。本文将重点介绍发布/订阅 API,通过实践展示这些优势。

2.1. 桥接 Spring 消息与 Dapr

Spring Boot 有高度集成的模型,尤其在消息传递方面。许多开发者已经熟悉 Spring 的抽象,如*KafkaTemplate*、RabbitTemplate及其监听器。虽然这些简化了代理集成,但它们仍与特定技术紧密耦合。

dapr-spring-boot-starter项目并非只是另一个 API,而是提供了无缝集成。它使用命名熟悉的接口,如DaprMessagingTemplate@Topic。这些使我们能轻松利用 Dapr 的分布式消息能力,无需了解底层基础设施细节。

更具体地说,通过引入 Dapr Spring Boot starter,我们不需要添加任何特定代理依赖。这实现了代理的灵活切换而无需修改代码。在组件级别配置特定供应商的功能也是可能的,无需更改应用代码。例如,我们可以包含Kafka 特定配置来利用消费者组等原生特性。

2.2. 实现基础设施灵活性无锁定

Dapr 将应用代码与基础设施解耦。例如,无论底层使用 Kafka、RabbitMQ、Redis Streams 还是 Azure Service Bus,我们的 Spring Boot 应用都通过 HTTP 或 gRPC 与 Dapr sidecar 通信,由 Dapr 处理与实际代理的集成。

最重要的是,我们可以在没有完整基础设施的情况下进行本地测试,正如我们将使用Testcontainers看到的那样。dapr-spring-boot-starter-test模块在测试生命周期中启动 Dapr sidecar,无需学习Dapr CLI或 Kubernetes。

3. 设置 Spring Boot 项目

我们将模拟一个网约车应用来演示 Dapr 如何与 Spring Boot 集成。用户向我们的 API 接口发送乘车请求,该请求将消息发布给订阅的驱动。驱动随后可以选择是否接受订单。

首先添加所需依赖。我们需要spring-boot-starter-web用于 REST 接口,以及dapr-spring-boot-starter用于 Spring Boot 集成:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>io.dapr.spring</groupId>
    <artifactId>dapr-spring-boot-starter</artifactId>
</dependency>

对于测试,我们还将添加dapr-spring-boot-starter-test以支持 Testcontainers,以及RabbitMQ容器作为消息代理:

<dependency>
    <groupId>io.dapr.spring</groupId>
    <artifactId>dapr-spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency> 
    <groupId>org.testcontainers</groupId> 
    <artifactId>rabbitmq</artifactId>
    <version>1.20.6</version>
    <scope>test</scope> 
</dependency>

3.1. 创建模型

这个 POJO 表示乘车请求:

public class RideRequest {
    private String passengerId;
    private String location;
    private String destination;

    // 默认 getter 和 setter
}

它不需要为消息传递添加特殊注解。

4. 使用 DaprMessagingTemplate 实现发布者

DaprMessagingTemplate 类似于 Spring 的其他消息模板,但不需要特定代理作为依赖。首先在 application.properties 中定义消息组件的名称:

dapr.pubsub.name=ride-hailing

然后,我们将使用 DaprPubSubProperties 类引用该属性,并将 RideRequest 作为消息类型。这样就完成了发送消息所需的配置:

@Configuration
@EnableConfigurationProperties({ DaprPubSubProperties.class })
public class DaprMessagingConfig {

    @Bean
    public DaprMessagingTemplate<RideRequest> messagingTemplate(
      DaprClient client, DaprPubSubProperties config) {
        return new DaprMessagingTemplate<>(client, config.getName(), false);
    }
}

4.1. 通过接口接收消息

接下来,创建一个控制器接收乘车请求,并使用 Dapr 模板将其转发到“ride-requests”主题。控制器可以映射到任意路径:

@RestController
@RequestMapping("/passenger")
public class PassengerRestController {

    @Autowired
    private DaprMessagingTemplate<RideRequest> messaging;

    @PostMapping("/request-ride")
    public String requestRide(@RequestBody RideRequest request) {
        messaging.send("ride-requests", request);
        return "等待驱动响应";
    }
}

注意,我们的消息体不需要任何转换或配置,因为 Dapr 会自动处理。

5. 创建和配置订阅者

在我们的例子中,驱动作为订阅者,接收乘车请求并决定是否接受。我们将使用 Dapr 的 @Topic 注解将传入消息绑定到控制器方法。

5.1. 使用 @Topic 实现控制器

使用@Topic注解时,必须同时指定组件和主题名称。**Dapr sidecar(由测试容器自动处理)在从代理转发消息时会调用此接口:**

@RestController
@RequestMapping("driver")
public class DriverRestController {

    // ...

    @PostMapping("ride-request")
    @Topic(pubsubName = "ride-hailing", name = "ride-requests")
    public void onRideRequest(@RequestBody CloudEvent<RideRequest> event) {
        // ...
    }
}

注意,负载被包装在CloudEvent对象中,由 Dapr 自动创建。这对于基于 CloudEvent 元数据进行路由或过滤等高级场景很有用,但在基本发布/订阅中不是必需的。

5.2. 配置订阅者行为

订阅者代表接受或拒绝订单的驱动。为演示,我们将使用简单的模式逻辑判断订单是否可接受。在 application.properties 中添加此配置,以便在启动应用时轻松更改:

driver.acceptance.criteria=East Side

接下来,将此值注入控制器变量,并添加计数器记录接受的/拒绝的订单数:

int drivesAccepted;
int drivesRejected;

@Value("${driver.acceptance.criteria}")
String criteria;

public int getDrivesAccepted() {
    return drivesAccepted;
}

public int getDrivesRejected() {
    return drivesRejected;
}

我们将在编写测试时使用这些变量来检查控制器行为。

5.3. 处理 CloudEvent

最后,从 CloudEvent 中提取负载并判断订单是否可接受:

@Topic(pubsubName = "ride-hailing", name = "ride-requests")
public void onRideRequest(@RequestBody CloudEvent<RideRequest> event) {
    RideRequest request = event.getData();

    if (request.getDestination().contains(criteria)) {
        drivesAccepted++;
    } else {
        drivesRejected++;
        throw new UnsupportedOperationException("订单已拒绝");
    }
}

由于无法直接拒绝消息,我们抛出异常来触发消息重新排队。对于 RabbitMQ,这需要 requeueInFailure 配置,我们将在创建测试容器时设置它。

6. 使用 Testcontainers 测试发布者

为验证发布者正确发送消息,我们将使用Testcontainers编写集成测试。这使我们可以启动 Dapr sidecar 和 RabbitMQ 实例,而无需依赖外部工具或 Dapr CLI。

6.1. 设置测试配置

对于测试属性,除了接受标准,我们还将包含消息组件名称和 Dapr 容器的专用服务器端口。

此外,我们需要选择固定端口,以便组件能在同一网络内相互定位:

driver.acceptance.criteria=East Side
dapr.pubsub.name=ride-hailing
server.port=60601

通过设置服务器端口号和指定组件间共享的网络开始配置。我们还将包含 DaprPubSubProperties 以稍后获取 RabbitMQ 组件名称:

@TestConfiguration(proxyBeanMethods = false)
@EnableConfigurationProperties({ DaprPubSubProperties.class })
public class DaprTestContainersConfig {

    @Value("${server.port}")
    private int serverPort;

    @Bean
    public Network daprNetwork() {
        return Network.newNetwork();
    }

    // ...
}

6.2. 配置容器

创建暴露默认端口 5672 的 RabbitMQ 容器:

@Bean
public RabbitMQContainer rabbitMQContainer(Network daprNetwork) {
    return new RabbitMQContainer(DockerImageName.parse("rabbitmq:3.7.25-management-alpine"))
      .withExposedPorts(5672)
      .withNetworkAliases("rabbitmq")
      .withNetwork(daprNetwork);
}

最后,添加一个 Dapr 容器封装所有内容,使用@ServiceConnection注解简化配置:

@Bean
@ServiceConnection
public DaprContainer daprContainer(
  Network daprNetwork, RabbitMQContainer rabbitMQ, DaprPubSubProperties pubSub) {
    Map<String, String> rabbitMqConfig = new HashMap<>();
    rabbitMqConfig.put("connectionString", "amqp://guest:guest@rabbitmq:5672");
    rabbitMqConfig.put("user", "guest");
    rabbitMqConfig.put("password", "guest");
    rabbitMqConfig.put("requeueInFailure", "true");

    return new DaprContainer("daprio/daprd:1.14.4")
      .withAppName("dapr-pubsub")
      .withNetwork(daprNetwork)
      .withComponent(new Component(pubSub.getName(), "pubsub.rabbitmq", "v1", rabbitMqConfig))
      .withAppPort(serverPort)
      .withAppChannelAddress("host.testcontainers.internal")
      .dependsOn(rabbitMQ);
}

除了模板代码,关键配置包括:

  • requeueInFailure:启用此选项,因为我们无法直接NACK消息。当有多个订阅者实例时,这允许其他客户端接收被其他客户端拒绝的消息。
  • *withComponent(…”pubsub.rabbitmq”)*:我们使用 RabbitMQ 实现,因此在此指定。Dapr 支持多种消息代理,包括云提供商托管服务,如Google PubSubAmazon SQS/SNSAzure Event Hub
  • withAppChannelAddress:包含此项以启用对容器的主机访问。没有它,测试可能在等待 Dapr 响应时挂起。

我们还可以通过日志配置启动 Dapr 容器,使调试更容易。为此,设置 withDaprLogLevelwithLogConsumer 选项:

.withDaprLogLevel(DaprLogLevel.INFO) 
.withLogConsumer(outputFrame -> logger.info(outputFrame.getUtf8String()))

6.3. 创建测试应用

现在可以在测试包中创建测试应用:

@SpringBootApplication
public class DaprPublisherTestApp {

    public static void main(String[] args) {
        SpringApplication.from(DaprPublisherApp::main)
          .with(DaprTestContainersConfig.class)
          .run(args);
    }
}

我们将引用主应用类以避免重复任何配置(如 DaprMessagingConfig 类)。我们还需要将 DriverRestController 复制到测试文件夹用于集成测试。

6.4. 创建集成测试

需要引用测试应用、配置和 DaprAutoConfiguration 类。然后注入控制器检查控制变量,并注入 Dapr 容器以了解应用何时准备好接收消息:

@SpringBootTest(
  classes = { 
    DaprPublisherTestApp.class, 
    DaprTestContainersConfig.class, 
    DaprAutoConfiguration.class }, 
  webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
class DaprPublisherIntegrationTest {

    @Autowired
    DriverRestController controller;

    @Autowired
    DaprContainer daprContainer;

    @Value("${server.port}")
    int serverPort;

    @Value("${driver.acceptance.criteria}")
    String criteria;

    // ...
}

由于需要验证容器是否正确启动,我们可以等待“app is subscribed to the following topics”消息。这有助于确保测试仅在容器准备好接收消息时开始。我们还将定义 API 的基本 URI 以使用RestAssured进行调用:

@BeforeEach
void setUp() {
    RestAssured.baseURI = "http://localhost:" + serverPort;
    org.testcontainers.Testcontainers.exposeHostPorts(serverPort);

    Wait.forLogMessage(".*app is subscribed to the following topics.*", 1)
      .waitUntilReady(daprContainer);
}

第一个测试涉及发布符合驱动接受标准的订单请求,并检查接受的订单数。当该数字增加时,我们可以断言订阅者处理了消息:

@Test
void whenDriveAcceptable_thenDrivesAcceptedIncrease() {
    int drivesAccepted = controller.getDrivesAccepted();

    given()
      .contentType(ContentType.JSON)
      .body("""
        {
          "passengerId": "1",
          "location": "Point A",
          "destination": "%s Point B"
        }
      """.formatted(criteria))
    .when()
      .post("/passenger/request-ride")
    .then()
      .statusCode(200);

    await()
      .atMost(Duration.ofSeconds(5))
      .until(controller::getDrivesAccepted, equalTo(drivesAccepted + 1));
}

相反,第二个测试涉及发布驱动应拒绝的订单请求:

@Test
void whenDriveUnacceptable_thenDrivesRejectedIncrease() {
    int drivesRejected = controller.getDrivesRejected();

    given().contentType(ContentType.JSON)
      .body("""
        {
          "passengerId": "2",
          "location": "Point B",
          "destination": "West Side A"
        }
      """)
    .when()
      .post("/passenger/request-ride")
    .then()
      .statusCode(200);

    await()
      .atMost(Duration.ofSeconds(5))
      .until(controller::getDrivesRejected, greaterThan(drivesRejected));
}

这次我们测试拒绝的订单数是否增加。此外,由于消息在出错时重新排队,我们验证变量大于初始值,因为我们无法确定它被处理了多少次。

7. 使用 Testcontainers 测试订阅者

现在测试订阅者行为。我们将创建与发布者类似的设置,重点验证订阅者如何处理传入消息。

7.1. 设置环境

首先包含类似的测试属性,仅更改服务器端口:

driver.acceptance.criteria=East Side
dapr.pubsub.name=ride-hailing
server.port=60602

DaprMessagingConfig 类复制到测试包,以便在集成测试中使用。还需要将DaprTestContainersConfig 复制到测试文件夹,因为我们需要相同的容器。

7.2. 创建集成测试

与之前的集成测试类似,我们需要连接容器、控制器、服务器端口、驱动接受标准,并在 @Setup 期间等待容器就绪。还需要包含 Dapr 消息模板向订阅者发送消息:

@SpringBootTest(
  classes = { 
    DaprSubscriberTestApp.class, 
    DaprTestContainersConfig.class, 
    DaprMessagingConfig.class, 
    DaprAutoConfiguration.class }, 
  webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
class DaprSubscriberIntegrationTest {

    @Autowired
    DaprMessagingTemplate<RideRequest> messaging;

    @Autowired
    DriverRestController controller;

    @Autowired
    DaprContainer daprContainer;

    @Value("${server.port}")
    int serverPort;

    @Value("${driver.acceptance.criteria}")
    String criteria;

    // 测试设置...
}

7.3. 实现测试场景

第一个测试涉及发送可接受的订单,并检查控制器是否正确接收:

@Test
void whenDriveAcceptable_thenDrivesAcceptedIncrease() {
    int drivesAccepted = controller.getDrivesAccepted();

    RideRequest ride = new RideRequest(
      "1", "Point A", String.format("%s Point B", criteria));
    messaging.send("ride-requests", ride);

    await().atMost(Duration.ofSeconds(5))
      .until(controller::getDrivesAccepted, equalTo(drivesAccepted + 1));
}

第二个测试包括发送不可接受的订单,并检查控制器是否正确拒绝:

@Test
void whenDriveUnacceptable_thenDrivesRejectedIncrease() {
    int drivesRejected = controller.getDrivesRejected();

    RideRequest request = new RideRequest("2", "Point B", "West Side Point A");
    messaging.send("ride-requests", request);

    await().atMost(Duration.ofSeconds(5))
      .until(controller::getDrivesRejected, greaterThan(drivesRejected));
}

通过订阅者测试,我们验证了 Dapr 正确将消息从代理路由到我们的 Spring Boot 应用,且订阅者行为符合预期。

8. 结论

在本文中,我们使用 Spring Boot 和 Dapr 构建了一个松耦合的发布/订阅消息系统。通过利用 Dapr 对消息代理的抽象及其 Spring Boot 集成,我们简化了消息逻辑而不绑定到特定基础设施。我们还演示了如何使用 Testcontainers 在本地运行和测试整个设置,在开发过程中实现快速反馈循环。

一如既往,源代码可在GitHub上获取。


原始标题:Flexible Pub/Sub Messaging With Spring Boot and Dapr | Baeldung