1. 概述

Axon Framework帮助我们构建事件驱动的微服务系统。在《Axon Framework指南》中,我们通过一个简单的Axon Spring Boot应用了解了Axon。该应用可以创建和更新订单,还能确认和发货。

在《Axon Framework中的查询分发》中,我们为OrderQueryService添加了更多查询功能。查询通常用于UI层,而UI层通常调用REST接口。

本教程将为所有查询创建REST接口,并通过集成测试来使用这些接口。

2. 在REST接口中使用查询

我们可以通过在带有@RestController注解的类中添加方法来创建REST接口。这里使用OrderRestEndpoint类。之前我们直接在控制器中使用QueryGateway,现在将其替换为OrderQueryService(在《Axon Framework中的查询分发》中实现)。这样控制器方法只需负责将行为绑定到REST路径。

所有接口都列在项目的order-api.http文件中。使用IntelliJ作为IDE时,可以直接调用这些接口。

2.1 点对点查询

点对点查询只有一个响应,实现起来很简单:

@GetMapping("/all-orders")
public CompletableFuture<List<OrderResponse>> findAllOrders() {
    return orderQueryService.findAllOrders();
}

Spring会等待CompletableFuture完成,并以JSON格式返回响应。我们可以通过调用localhost:8080/all-orders来测试,获取所有订单的数组。

在干净的环境中,先通过POST请求创建两个订单:

  • http://localhost:8080/order/666a1661-474d-4046-8b12-8b5896312768
  • http://localhost:8080/ship-order

然后调用http://localhost:8080/all-orders会得到:

[
  {
    "orderId": "72d67527-a27c-416e-a904-396ebf222344",
    "products": {
      "Deluxe Chair": 1
    },
    "orderStatus": "SHIPPED"
  },
  {
    "orderId": "666a1661-474d-4046-8b12-8b5896312768",
    "products": {},
    "orderStatus": "CREATED"
  }
]

2.2 流式查询

流式查询返回事件流并最终关闭。虽然可以等待流关闭后一次性返回响应,但直接流式传输更高效。我们使用Server-Send事件实现:

@GetMapping(path = "/all-orders-streaming", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<OrderResponse> allOrdersStreaming() {
    return orderQueryService.allOrdersStreaming();
}

通过指定媒体类型,Spring知道要以Server-Send事件格式返回响应。每个订单会单独发送。如果客户端支持Server-Send事件,调用localhost:8080/all-orders-streaming会逐个返回所有订单。

使用与点对点查询相同的数据,结果如下:

data:{"orderId":"72d67527-a27c-416e-a904-396ebf222344","products":{"Deluxe Chair":1},"orderStatus":"SHIPPED"}

data:{"orderId":"666a1661-474d-4046-8b12-8b5896312768","products":{},"orderStatus":"CREATED"}

这些都是独立的Server-Send事件。

2.3 散播-聚集查询

组合Axon查询响应的逻辑已在OrderQueryService中实现。这使得实现与点对点查询非常相似,因为最终只有一个响应。例如,使用散播-聚集查询添加接口:

@GetMapping("/total-shipped/{product-id}")
public Integer totalShipped(@PathVariable("product-id") String productId) {
    return orderQueryService.totalShipped(productId);
}

调用http://localhost:8080/total-shipped/Deluxe Chair返回已发货椅子的总数,包括LegacyQueryHandler返回的234个。如果数据库中还有通过ship-order发货的椅子,结果应为235。

2.4 订阅查询

与流式查询不同,订阅查询可能永不结束。因此等待其完成不可行。我们再次使用Server-Send事件添加接口:

@GetMapping(path = "/order-updates/{order-id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<OrderResponse> orderUpdates(@PathVariable("order-id") String orderId) {
    return orderQueryService.orderUpdates(orderId);
}

调用http://localhost:8080/order-updates/666a1661-474d-4046-8b12-8b5896312768会返回该订单的更新流。通过POST请求http://localhost:8080/order/666a1661-474d-4046-8b12-8b5896312768/product/a6aa01eb-4e38-4dfb-b53b-b5b82961fbf3触发更新后,该更新会作为Server-Send事件发送。

我们会看到初始状态和更新后的状态,连接保持开放以接收后续更新:

data:{"orderId":"666a1661-474d-4046-8b12-8b5896312768","products":{},"orderStatus":"CREATED"}

data:{"orderId":"666a1661-474d-4046-8b12-8b5896312768","products":{"a6aa01eb-4e38-4dfb-b53b-b5b82961fbf3":1},"orderStatus":"CREATED"}

可以看到更新包含了我们添加的产品。

3. 集成测试

集成测试使用WebClient。测试时通过@SpringBootTest运行整个应用,先通过其他REST接口改变状态。这些接口会触发一个或多个命令,进而创建事件。创建订单时使用《Axon Framework指南》中添加的接口。每个测试使用@DirtiesContext注解,确保一个测试中创建的事件不会影响其他测试。

src/test/resourcesapplication.properties中设置axon.axonserver.enabled=false,避免测试时运行Axon Server。这样会使用非分布式网关,运行更快且不需要Axon Server。网关是处理三种不同类型消息的实例。

可以创建辅助方法提高测试可读性。这些方法提供正确的类型并在需要时设置HTTP头。例如:

private void verifyVoidPost(WebClient client, String uri) {
    StepVerifier.create(retrieveResponse(client.post()
      .uri(uri)))
      .verifyComplete();
}

这对调用返回void的POST接口很有用。它使用retrieveResponse()辅助方法执行调用并验证完成。这类操作很常见,将其放入辅助方法使测试更易读和维护。

3.1 测试点对点查询

测试/all-orders接口时,创建一个订单并验证能否检索到该订单。首先需要创建WebClient(用于发起HTTP调用的响应式实例)。创建订单后,获取所有订单并验证结果:

WebClient client = WebClient.builder()
  .clientConnector(httpConnector())
  .build();
createRandomNewOrder(client);
StepVerifier.create(retrieveListResponse(client.get()
    .uri("http://localhost:" + port + "/all-orders")))
  .expectNextMatches(list -> 1 == list.size() && list.get(0)
    .getOrderStatus() == OrderStatusResponse.CREATED)
  .verifyComplete();

由于是响应式的,使用reactor-test的StepVerifier验证响应。期望列表中只有一个刚创建的订单,且订单状态为CREATED。

3.2 测试流式查询

流式查询可能返回多个订单。我们还需要测试流是否完成。测试中创建三个随机订单,然后验证流式查询响应:

WebClient client = WebClient.builder()
  .clientConnector(httpConnector())
  .build();
for (int i = 0; i < 3; i++) {
    createRandomNewOrder(client);
}
StepVerifier.create(retrieveStreamingResponse(client.get()
    .uri("http://localhost:" + port + "/all-orders-streaming")))
  .expectNextMatches(o -> o.getOrderStatus() == OrderStatusResponse.CREATED)
  .expectNextMatches(o -> o.getOrderStatus() == OrderStatusResponse.CREATED)
  .expectNextMatches(o -> o.getOrderStatus() == OrderStatusResponse.CREATED)
  .verifyComplete();

通过最后的verifyComplete()确保流关闭。⚠️ 注意:流式查询可能实现为永不完成,但这里会完成,因此验证这一点很重要。

3.3 测试散播-聚集查询

测试散播-聚集查询时,需要确保多个处理器的结果被组合。通过接口发货一把椅子,然后检索所有已发货的椅子。由于LegacyQueryHandler返回234,结果应为235

WebClient client = WebClient.builder()
  .clientConnector(httpConnector())
  .build();
verifyVoidPost(client, "http://localhost:" + port + "/ship-order");
StepVerifier.create(retrieveIntegerResponse(client.get()
    .uri("http://localhost:" + port + "/total-shipped/Deluxe Chair")))
  .assertNext(r -> assertEquals(235, r))
  .verifyComplete();

retrieveIntegerResponse()辅助方法从响应体中提取整数。

3.4 测试订阅查询

订阅查询在连接关闭前会一直保持活跃。我们希望同时测试初始结果和更新。因此使用ScheduledExecutorService在测试中启用多线程:一个线程更新订单,另一个线程验证返回的订单。为提高可读性,使用单独的方法执行更新:

private void addIncrementDecrementConfirmAndShipProduct(String orderId, String productId) {
    WebClient client = WebClient.builder()
      .clientConnector(httpConnector())
      .build();
    String base = "http://localhost:" + port + "/order/" + orderId;
    verifyVoidPost(client, base + "/product/" + productId);
    verifyVoidPost(client, base + "/product/" + productId + "/increment");
    // 其他操作...
}

该方法创建并使用自己的WebClient实例,避免与验证响应的实例冲突。

实际测试通过执行器调用此方法并验证更新:

// 创建两个WebClient,生成测试ID,创建订单
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.schedule(() -> addIncrementDecrementConfirmAndShipProduct(orderId, productId), 1L, TimeUnit.SECONDS);
try {
    StepVerifier.create(retrieveStreamingResponse(receiverClient.get()
      .uri("http://localhost:" + port + "/order-updates/" + orderId)))
      .assertNext(p -> assertTrue(p.getProducts().isEmpty()))
      // 其他断言...
      .assertNext(p -> assertEquals(OrderStatusResponse.SHIPPED, p.getOrderStatus()))
      .thenCancel()
      .verify();
} finally {
    executor.shutdown();
}

⚠️ 注意:等待1秒再执行更新,确保不会错过第一次更新。使用随机UUID生成productId,用于更新和验证结果。每次变更都应触发更新。

根据更新后的预期状态添加断言。必须调用thenCancel()结束测试,否则订阅会一直保持开放。使用finally块确保关闭执行器。

4. 总结

本文介绍了如何为查询添加REST接口,这些接口可用于构建UI。还学习了如何使用WebClient测试这些接口。

所有示例和代码片段可在GitHub上找到。

更多问题请参考Discuss AxonIQ


原始标题:Using and Testing Axon Applications via REST