1. 引言
本文将介绍 Dapr 是什么、如何与 Spring Boot 集成,以及如何创建不依赖特定消息代理的发布/订阅系统。我们将通过一个网约车场景来演示:用户发起乘车请求,驱动订阅这些请求。最终,我们将实现无需 Dapr CLI 或外部基础设施即可运行的测试。
2. 使用 Dapr 实现基础设施无关架构
分布式系统通常面临一些常见但复杂的挑战。我们通常使用特定供应商的库、基础设施工具和手动集成工作来解决这些问题。
Dapr(分布式应用运行时)提供了一套 API 和构建块来解决这些挑战,它抽象了基础设施细节,让我们能专注于业务逻辑。这些原则同样适用于其他场景,比如调用其他服务(通过服务调用 API)、持久化状态(通过状态管理 API)或获取密钥(通过密钥 API)。
这种解耦使应用更易于测试、跨环境更易移植,且对基础设施变更更具弹性。本文将重点介绍发布/订阅 API,通过实践展示这些优势。
2.1. 桥接 Spring 消息与 Dapr
Spring Boot 有高度集成的模型,尤其在消息传递方面。许多开发者已经熟悉 Spring 的抽象,如*KafkaTemplate*、RabbitTemplate及其监听器。虽然这些简化了代理集成,但它们仍与特定技术紧密耦合。
dapr-spring-boot-starter项目并非只是另一个 API,而是提供了无缝集成。它使用命名熟悉的接口,如DaprMessagingTemplate和@Topic。这些使我们能轻松利用 Dapr 的分布式消息能力,无需了解底层基础设施细节。
更具体地说,通过引入 Dapr Spring Boot starter,我们不需要添加任何特定代理依赖。这实现了代理的灵活切换而无需修改代码。在组件级别配置特定供应商的功能也是可能的,无需更改应用代码。例如,我们可以包含Kafka 特定配置来利用消费者组等原生特性。
2.2. 实现基础设施灵活性无锁定
Dapr 将应用代码与基础设施解耦。例如,无论底层使用 Kafka、RabbitMQ、Redis Streams 还是 Azure Service Bus,我们的 Spring Boot 应用都通过 HTTP 或 gRPC 与 Dapr sidecar 通信,由 Dapr 处理与实际代理的集成。
最重要的是,我们可以在没有完整基础设施的情况下进行本地测试,正如我们将使用Testcontainers看到的那样。dapr-spring-boot-starter-test模块在测试生命周期中启动 Dapr sidecar,无需学习Dapr CLI或 Kubernetes。
3. 设置 Spring Boot 项目
我们将模拟一个网约车应用来演示 Dapr 如何与 Spring Boot 集成。用户向我们的 API 接口发送乘车请求,该请求将消息发布给订阅的驱动。驱动随后可以选择是否接受订单。
首先添加所需依赖。我们需要spring-boot-starter-web用于 REST 接口,以及dapr-spring-boot-starter用于 Spring Boot 集成:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.dapr.spring</groupId>
<artifactId>dapr-spring-boot-starter</artifactId>
</dependency>
对于测试,我们还将添加dapr-spring-boot-starter-test以支持 Testcontainers,以及RabbitMQ容器作为消息代理:
<dependency>
<groupId>io.dapr.spring</groupId>
<artifactId>dapr-spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>rabbitmq</artifactId>
<version>1.20.6</version>
<scope>test</scope>
</dependency>
3.1. 创建模型
这个 POJO 表示乘车请求:
public class RideRequest {
private String passengerId;
private String location;
private String destination;
// 默认 getter 和 setter
}
它不需要为消息传递添加特殊注解。
4. 使用 DaprMessagingTemplate 实现发布者
DaprMessagingTemplate 类似于 Spring 的其他消息模板,但不需要特定代理作为依赖。首先在 application.properties 中定义消息组件的名称:
dapr.pubsub.name=ride-hailing
然后,我们将使用 DaprPubSubProperties 类引用该属性,并将 RideRequest 作为消息类型。这样就完成了发送消息所需的配置:
@Configuration
@EnableConfigurationProperties({ DaprPubSubProperties.class })
public class DaprMessagingConfig {
@Bean
public DaprMessagingTemplate<RideRequest> messagingTemplate(
DaprClient client, DaprPubSubProperties config) {
return new DaprMessagingTemplate<>(client, config.getName(), false);
}
}
4.1. 通过接口接收消息
接下来,创建一个控制器接收乘车请求,并使用 Dapr 模板将其转发到“ride-requests”主题。控制器可以映射到任意路径:
@RestController
@RequestMapping("/passenger")
public class PassengerRestController {
@Autowired
private DaprMessagingTemplate<RideRequest> messaging;
@PostMapping("/request-ride")
public String requestRide(@RequestBody RideRequest request) {
messaging.send("ride-requests", request);
return "等待驱动响应";
}
}
注意,我们的消息体不需要任何转换或配置,因为 Dapr 会自动处理。
5. 创建和配置订阅者
在我们的例子中,驱动作为订阅者,接收乘车请求并决定是否接受。我们将使用 Dapr 的 @Topic 注解将传入消息绑定到控制器方法。
5.1. 使用 @Topic 实现控制器
使用@Topic注解时,必须同时指定组件和主题名称。**Dapr sidecar(由测试容器自动处理)在从代理转发消息时会调用此接口:**
@RestController
@RequestMapping("driver")
public class DriverRestController {
// ...
@PostMapping("ride-request")
@Topic(pubsubName = "ride-hailing", name = "ride-requests")
public void onRideRequest(@RequestBody CloudEvent<RideRequest> event) {
// ...
}
}
注意,负载被包装在CloudEvent对象中,由 Dapr 自动创建。这对于基于 CloudEvent 元数据进行路由或过滤等高级场景很有用,但在基本发布/订阅中不是必需的。
5.2. 配置订阅者行为
订阅者代表接受或拒绝订单的驱动。为演示,我们将使用简单的模式逻辑判断订单是否可接受。在 application.properties 中添加此配置,以便在启动应用时轻松更改:
driver.acceptance.criteria=East Side
接下来,将此值注入控制器变量,并添加计数器记录接受的/拒绝的订单数:
int drivesAccepted;
int drivesRejected;
@Value("${driver.acceptance.criteria}")
String criteria;
public int getDrivesAccepted() {
return drivesAccepted;
}
public int getDrivesRejected() {
return drivesRejected;
}
我们将在编写测试时使用这些变量来检查控制器行为。
5.3. 处理 CloudEvent
最后,从 CloudEvent 中提取负载并判断订单是否可接受:
@Topic(pubsubName = "ride-hailing", name = "ride-requests")
public void onRideRequest(@RequestBody CloudEvent<RideRequest> event) {
RideRequest request = event.getData();
if (request.getDestination().contains(criteria)) {
drivesAccepted++;
} else {
drivesRejected++;
throw new UnsupportedOperationException("订单已拒绝");
}
}
由于无法直接拒绝消息,我们抛出异常来触发消息重新排队。对于 RabbitMQ,这需要 requeueInFailure 配置,我们将在创建测试容器时设置它。
6. 使用 Testcontainers 测试发布者
为验证发布者正确发送消息,我们将使用Testcontainers编写集成测试。这使我们可以启动 Dapr sidecar 和 RabbitMQ 实例,而无需依赖外部工具或 Dapr CLI。
6.1. 设置测试配置
对于测试属性,除了接受标准,我们还将包含消息组件名称和 Dapr 容器的专用服务器端口。
此外,我们需要选择固定端口,以便组件能在同一网络内相互定位:
driver.acceptance.criteria=East Side
dapr.pubsub.name=ride-hailing
server.port=60601
通过设置服务器端口号和指定组件间共享的网络开始配置。我们还将包含 DaprPubSubProperties 以稍后获取 RabbitMQ 组件名称:
@TestConfiguration(proxyBeanMethods = false)
@EnableConfigurationProperties({ DaprPubSubProperties.class })
public class DaprTestContainersConfig {
@Value("${server.port}")
private int serverPort;
@Bean
public Network daprNetwork() {
return Network.newNetwork();
}
// ...
}
6.2. 配置容器
创建暴露默认端口 5672 的 RabbitMQ 容器:
@Bean
public RabbitMQContainer rabbitMQContainer(Network daprNetwork) {
return new RabbitMQContainer(DockerImageName.parse("rabbitmq:3.7.25-management-alpine"))
.withExposedPorts(5672)
.withNetworkAliases("rabbitmq")
.withNetwork(daprNetwork);
}
最后,添加一个 Dapr 容器封装所有内容,使用@ServiceConnection注解简化配置:
@Bean
@ServiceConnection
public DaprContainer daprContainer(
Network daprNetwork, RabbitMQContainer rabbitMQ, DaprPubSubProperties pubSub) {
Map<String, String> rabbitMqConfig = new HashMap<>();
rabbitMqConfig.put("connectionString", "amqp://guest:guest@rabbitmq:5672");
rabbitMqConfig.put("user", "guest");
rabbitMqConfig.put("password", "guest");
rabbitMqConfig.put("requeueInFailure", "true");
return new DaprContainer("daprio/daprd:1.14.4")
.withAppName("dapr-pubsub")
.withNetwork(daprNetwork)
.withComponent(new Component(pubSub.getName(), "pubsub.rabbitmq", "v1", rabbitMqConfig))
.withAppPort(serverPort)
.withAppChannelAddress("host.testcontainers.internal")
.dependsOn(rabbitMQ);
}
除了模板代码,关键配置包括:
- requeueInFailure:启用此选项,因为我们无法直接NACK消息。当有多个订阅者实例时,这允许其他客户端接收被其他客户端拒绝的消息。
- *withComponent(…”pubsub.rabbitmq”)*:我们使用 RabbitMQ 实现,因此在此指定。Dapr 支持多种消息代理,包括云提供商托管服务,如Google PubSub、Amazon SQS/SNS和Azure Event Hub。
- withAppChannelAddress:包含此项以启用对容器的主机访问。没有它,测试可能在等待 Dapr 响应时挂起。
我们还可以通过日志配置启动 Dapr 容器,使调试更容易。为此,设置 withDaprLogLevel 和 withLogConsumer 选项:
.withDaprLogLevel(DaprLogLevel.INFO)
.withLogConsumer(outputFrame -> logger.info(outputFrame.getUtf8String()))
6.3. 创建测试应用
现在可以在测试包中创建测试应用:
@SpringBootApplication
public class DaprPublisherTestApp {
public static void main(String[] args) {
SpringApplication.from(DaprPublisherApp::main)
.with(DaprTestContainersConfig.class)
.run(args);
}
}
我们将引用主应用类以避免重复任何配置(如 DaprMessagingConfig 类)。我们还需要将 DriverRestController 复制到测试文件夹用于集成测试。
6.4. 创建集成测试
需要引用测试应用、配置和 DaprAutoConfiguration 类。然后注入控制器检查控制变量,并注入 Dapr 容器以了解应用何时准备好接收消息:
@SpringBootTest(
classes = {
DaprPublisherTestApp.class,
DaprTestContainersConfig.class,
DaprAutoConfiguration.class },
webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
class DaprPublisherIntegrationTest {
@Autowired
DriverRestController controller;
@Autowired
DaprContainer daprContainer;
@Value("${server.port}")
int serverPort;
@Value("${driver.acceptance.criteria}")
String criteria;
// ...
}
由于需要验证容器是否正确启动,我们可以等待“app is subscribed to the following topics”消息。这有助于确保测试仅在容器准备好接收消息时开始。我们还将定义 API 的基本 URI 以使用RestAssured进行调用:
@BeforeEach
void setUp() {
RestAssured.baseURI = "http://localhost:" + serverPort;
org.testcontainers.Testcontainers.exposeHostPorts(serverPort);
Wait.forLogMessage(".*app is subscribed to the following topics.*", 1)
.waitUntilReady(daprContainer);
}
第一个测试涉及发布符合驱动接受标准的订单请求,并检查接受的订单数。当该数字增加时,我们可以断言订阅者处理了消息:
@Test
void whenDriveAcceptable_thenDrivesAcceptedIncrease() {
int drivesAccepted = controller.getDrivesAccepted();
given()
.contentType(ContentType.JSON)
.body("""
{
"passengerId": "1",
"location": "Point A",
"destination": "%s Point B"
}
""".formatted(criteria))
.when()
.post("/passenger/request-ride")
.then()
.statusCode(200);
await()
.atMost(Duration.ofSeconds(5))
.until(controller::getDrivesAccepted, equalTo(drivesAccepted + 1));
}
相反,第二个测试涉及发布驱动应拒绝的订单请求:
@Test
void whenDriveUnacceptable_thenDrivesRejectedIncrease() {
int drivesRejected = controller.getDrivesRejected();
given().contentType(ContentType.JSON)
.body("""
{
"passengerId": "2",
"location": "Point B",
"destination": "West Side A"
}
""")
.when()
.post("/passenger/request-ride")
.then()
.statusCode(200);
await()
.atMost(Duration.ofSeconds(5))
.until(controller::getDrivesRejected, greaterThan(drivesRejected));
}
这次我们测试拒绝的订单数是否增加。此外,由于消息在出错时重新排队,我们验证变量大于初始值,因为我们无法确定它被处理了多少次。
7. 使用 Testcontainers 测试订阅者
现在测试订阅者行为。我们将创建与发布者类似的设置,重点验证订阅者如何处理传入消息。
7.1. 设置环境
首先包含类似的测试属性,仅更改服务器端口:
driver.acceptance.criteria=East Side
dapr.pubsub.name=ride-hailing
server.port=60602
将 DaprMessagingConfig 类复制到测试包,以便在集成测试中使用。还需要将DaprTestContainersConfig 复制到测试文件夹,因为我们需要相同的容器。
7.2. 创建集成测试
与之前的集成测试类似,我们需要连接容器、控制器、服务器端口、驱动接受标准,并在 @Setup 期间等待容器就绪。还需要包含 Dapr 消息模板向订阅者发送消息:
@SpringBootTest(
classes = {
DaprSubscriberTestApp.class,
DaprTestContainersConfig.class,
DaprMessagingConfig.class,
DaprAutoConfiguration.class },
webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
class DaprSubscriberIntegrationTest {
@Autowired
DaprMessagingTemplate<RideRequest> messaging;
@Autowired
DriverRestController controller;
@Autowired
DaprContainer daprContainer;
@Value("${server.port}")
int serverPort;
@Value("${driver.acceptance.criteria}")
String criteria;
// 测试设置...
}
7.3. 实现测试场景
第一个测试涉及发送可接受的订单,并检查控制器是否正确接收:
@Test
void whenDriveAcceptable_thenDrivesAcceptedIncrease() {
int drivesAccepted = controller.getDrivesAccepted();
RideRequest ride = new RideRequest(
"1", "Point A", String.format("%s Point B", criteria));
messaging.send("ride-requests", ride);
await().atMost(Duration.ofSeconds(5))
.until(controller::getDrivesAccepted, equalTo(drivesAccepted + 1));
}
第二个测试包括发送不可接受的订单,并检查控制器是否正确拒绝:
@Test
void whenDriveUnacceptable_thenDrivesRejectedIncrease() {
int drivesRejected = controller.getDrivesRejected();
RideRequest request = new RideRequest("2", "Point B", "West Side Point A");
messaging.send("ride-requests", request);
await().atMost(Duration.ofSeconds(5))
.until(controller::getDrivesRejected, greaterThan(drivesRejected));
}
通过订阅者测试,我们验证了 Dapr 正确将消息从代理路由到我们的 Spring Boot 应用,且订阅者行为符合预期。
8. 结论
在本文中,我们使用 Spring Boot 和 Dapr 构建了一个松耦合的发布/订阅消息系统。通过利用 Dapr 对消息代理的抽象及其 Spring Boot 集成,我们简化了消息逻辑而不绑定到特定基础设施。我们还演示了如何使用 Testcontainers 在本地运行和测试整个设置,在开发过程中实现快速反馈循环。
一如既往,源代码可在GitHub上获取。