2. 查询分发机制
当我们向 Axon 提交查询时,框架会将该查询分发给所有能处理此查询的已注册查询处理器。在分布式系统中,可能出现多个节点支持同类型查询的情况,也可能单个节点存在多个可处理该查询的处理器。
那么 Axon 如何决定返回哪些结果?答案取决于查询的分发方式。Axon 提供四种查询模式:
- 点对点查询:从任意支持该查询的节点获取完整答案
- 流式查询:从任意支持该查询的节点获取答案流
- 分散-聚集查询:从所有支持该查询的节点获取完整答案
- 订阅查询:获取当前答案后持续监听后续更新
接下来我们将深入探讨每种查询模式的实现细节。
3. 点对点查询
点对点查询模式下,Axon 会将查询发送给所有支持该查询的节点。Axon 假设任何节点都能提供完整答案,因此只返回最先响应节点的结果。
本节我们将使用点对点查询获取系统中的所有订单。
3.1. 定义查询
Axon 使用强类型类表示查询类型并封装查询参数。由于我们要查询所有订单,不需要参数,因此使用空类:
public class FindAllOrderedProductsQuery {}
3.2. 定义查询处理器
使用 @QueryHandler
注解注册查询处理器:
@Service
public class InMemoryOrdersEventHandler implements OrdersEventHandler {
private final Map<String, Order> orders = new HashMap<>();
@QueryHandler
public List<Order> handle(FindAllOrderedProductsQuery query) {
return new ArrayList<>(orders.values());
}
}
上述代码中,handle()
方法被注册为 Axon 查询处理器:
- 能响应
FindAllOrderedProductsQuery
查询 - 返回
List<Order>
。Axon 会根据返回类型决定哪个处理器能处理特定查询,这有助于逐步迁移 API
这里使用 OrdersEventHandler
接口是为了方便后续切换到 MongoDB 等持久化存储实现。当前实现简单地将 Order
对象存储在内存 Map
中。
3.3. 分发点对点查询
现在我们定义好查询类型和处理器,准备向 Axon 分发查询。创建服务类:
@Service
public class OrderQueryService {
private final QueryGateway queryGateway;
public OrderQueryService(QueryGateway queryGateway) {
this.queryGateway = queryGateway;
}
public CompletableFuture<List<OrderResponse>> findAllOrders() {
return queryGateway.query(new FindAllOrderedProductsQuery(),
ResponseTypes.multipleInstancesOf(Order.class))
.thenApply(r -> r.stream()
.map(OrderResponse::new)
.collect(Collectors.toList()));
}
}
代码中使用 Axon 的 QueryGateway
分发查询。通过指定 ResponseTypes.multipleInstancesOf(Order.class)
,Axon 知道我们只需要返回类型为 Order
集合的处理器。
为隔离内部 Order
模型与外部客户端,我们将结果包装为 OrderResponse
对象。
3.4. 测试点对点查询
使用 @SpringBootTest
进行集成测试。首先添加依赖:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<scope>test</scope>
</dependency>
测试代码:
@SpringBootTest(classes = OrderApplication.class)
class OrderQueryServiceIntegrationTest {
@Autowired
OrderQueryService queryService;
@Autowired
OrdersEventHandler handler;
private String orderId;
@BeforeEach
void setUp() {
orderId = UUID.randomUUID().toString();
Order order = new Order(orderId);
handler.reset(Collections.singletonList(order));
}
@Test
void givenOrderCreatedEventSend_whenCallingAllOrders_thenOneCreatedOrderIsReturned()
throws ExecutionException, InterruptedException {
List<OrderResponse> result = queryService.findAllOrders().get();
assertEquals(1, result.size());
OrderResponse response = result.get(0);
assertEquals(orderId, response.getOrderId());
assertEquals(OrderStatusResponse.CREATED, response.getOrderStatus());
assertTrue(response.getProducts().isEmpty());
}
}
@BeforeEach
方法调用 reset()
(OrdersEventHandler
中的辅助方法)预加载测试订单。测试中调用服务方法验证查询处理器是否正确返回测试订单。
4. 流式查询
流式查询允许以流的形式返回大型集合。与点对点查询不同,流式查询无需等待处理器端完成全部结果,而是分段返回结果。与订阅查询不同,流式查询最终会完成。
依赖 Project Reactor 的流式查询支持背压等特性处理大型结果集。需添加依赖:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
4.1. 定义查询
复用点对点查询的 FindAllOrderedProductsQuery
。
4.2. 定义查询处理器
**流式查询应返回 Publisher
**。使用 Reactor 将内存 Map 的值转换为 Mono
:
@QueryHandler
public Publisher<Order> handleStreaming(FindAllOrderedProductsQuery query) {
return Mono.fromCallable(orders::values).flatMapMany(Flux::fromIterable);
}
使用 flatMapMany()
将 Mono
转换为 Publisher
。
4.3. 分发流式查询
在 OrderQueryService
中添加方法(为区分点对点查询使用不同方法名):
public Flux<OrderResponse> allOrdersStreaming() {
Publisher<Order> publisher = queryGateway.streamingQuery(new FindAllOrderedProductsQuery(), Order.class);
return Flux.from(publisher).map(OrderResponse::new);
}
4.4. 测试流式查询
添加测试用例:
@Test
void givenOrderCreatedEventSend_whenCallingAllOrdersStreaming_thenOneOrderIsReturned() {
Flux<OrderResponse> result = queryService.allOrdersStreaming();
StepVerifier.create(result)
.assertNext(order -> assertEquals(orderId, order.getOrderId()))
.expectComplete()
.verify();
}
通过 expectComplete()
验证流确实完成。
5. 分散-聚集查询
分散-聚集查询会分发给所有支持该查询的节点上的所有处理器。每个处理器的结果会被合并为单一响应。如果两个节点具有相同的应用名,Axon 视它们为等价节点,只使用最先响应节点的结果。
本节创建查询统计指定产品 ID 的已发货产品总数,模拟同时查询实时系统和遗留系统。
5.1. 定义查询
需要提供产品 ID 参数,创建包含参数的 POJO:
public class TotalProductsShippedQuery {
private final String productId;
public TotalProductsShippedQuery(String productId) {
this.productId = productId;
}
// getter
}
5.2. 定义查询处理器
首先查询基于事件的系统(内存存储)。在 InMemoryOrdersEventHandler
中添加处理器:
@QueryHandler
public Integer handle(TotalProductsShippedQuery query) {
return orders.values().stream()
.filter(o -> o.getOrderStatus() == OrderStatus.SHIPPED)
.map(o -> Optional.ofNullable(o.getProducts().get(query.getProductId())).orElse(0))
.reduce(0, Integer::sum);
}
代码过滤出已发货订单,统计匹配产品 ID 的发货数量并求和。
为模拟遗留系统数据,创建独立类和处理器:
@Service
public class LegacyQueryHandler {
@QueryHandler
public Integer handle(TotalProductsShippedQuery query) {
switch (query.getProductId()) {
case "Deluxe Chair":
return 234;
case "a6aa01eb-4e38-4dfb-b53b-b5b82961fbf3":
return 10;
default:
return 0;
}
}
}
实际场景中,多个处理器通常分布在不同的 Spring 应用中。同一应用内通常不会为同一查询类型设置多个处理器。
5.3. 分发分散-聚集查询
在 OrderQueryService
中添加方法:
public Integer totalShipped(String productId) {
return queryGateway.scatterGather(new TotalProductsShippedQuery(productId),
ResponseTypes.instanceOf(Integer.class), 10L, TimeUnit.SECONDS)
.reduce(0, Integer::sum);
}
构造查询对象时传入 productId
,设置 10 秒超时。Axon 只返回超时窗口内获取的结果,未及时响应的处理器结果将被忽略。
5.4. 测试分散-聚集查询
添加测试用例:
void givenThreeDeluxeChairsShipped_whenCallingAllShippedChairs_then234PlusTreeIsReturned() {
Order order = new Order(orderId);
order.getProducts().put("Deluxe Chair", 3);
order.setOrderShipped();
handler.reset(Collections.singletonList(order));
assertEquals(237, queryService.totalShipped("Deluxe Chair"));
}
测试模拟事件驱动系统中有 3 个订单,遗留系统中硬编码 234 个豪华椅,验证合并结果为 237。
6. 订阅查询
订阅查询先返回初始结果,然后持续接收更新流。本节查询订单当前状态,并保持连接接收后续更新。
6.1. 定义查询
创建包含订单 ID 参数的查询类:
public class OrderUpdatesQuery {
private final String orderId;
public OrderUpdatesQuery(String orderId) {
this.orderId = orderId;
}
// getter
}
6.2. 定义查询处理器
从内存 Map 获取订单的处理器很简单:
@QueryHandler
public Order handle(OrderUpdatesQuery query) {
return orders.get(query.getOrderId());
}
6.3. 发出查询更新
订阅查询的价值在于接收更新。Axon 提供 QueryUpdateEmitter
控制订阅更新的时机和方式。将其注入 InMemoryOrdersEventHandler
:
@Service
public class InMemoryOrdersEventHandler implements OrdersEventHandler {
private final QueryUpdateEmitter emitter;
public InMemoryOrdersEventHandler(QueryUpdateEmitter emitter) {
this.emitter = emitter;
}
private void emitUpdate(Order order) {
emitter.emit(OrderUpdatesQuery.class, q -> order.getOrderId()
.equals(q.getOrderId()), order);
}
// 事件和查询处理器
}
emitter.emit()
调用通知 Axon 订阅 OrderUpdatesQuery
的客户端可能需要更新。第二个参数是过滤器,确保只有匹配订单 ID 的订阅才会收到更新。
在修改订单的事件处理器中调用 emitUpdate()
。例如订单发货时通知订阅者:
@Service
public class InMemoryOrdersEventHandler implements OrdersEventHandler {
@EventHandler
public void on(OrderShippedEvent event) {
orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
order.setOrderShipped();
emitUpdate(order);
return order;
});
}
// 字段、查询处理器、其他事件处理器和 emitUpdate() 方法
}
对 ProductAddedEvent
、ProductCountIncrementedEvent
等事件做同样处理。
6.4. 订阅查询
使用 Reactor Core 的 Flux
构建订阅服务方法。添加依赖:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
在 OrderQueryService
中实现:
public class OrderQueryService {
public Flux<OrderResponse> orderUpdates(String orderId) {
return subscriptionQuery(new OrderUpdatesQuery(orderId), ResponseTypes.instanceOf(Order.class))
.map(OrderResponse::new);
}
private <Q, R> Flux<R> subscriptionQuery(Q query, ResponseType<R> resultType) {
SubscriptionQueryResult<R, R> result = queryGateway.subscriptionQuery(query,
resultType, resultType);
return result.initialResult()
.concatWith(result.updates())
.doFinally(signal -> result.close());
}
// 其他服务方法
}
公共方法 orderUpdates()
委托给私有方法 subscriptionQuery()
,并将结果包装为 OrderResponse
。
subscriptionQuery()
方法合并初始结果和后续更新:
- 调用
queryGateway.subscriptionQuery()
获取SubscriptionQueryResult
- 使用
result.getInitialResult()
和result.getUpdates()
获取所需数据 - 最后关闭流
Axon 的 Reactive 扩展 提供了更便捷的订阅查询网关实现。
6.5. 测试订阅查询
使用 StepVerifier
测试返回 Flux
的方法。添加依赖:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
测试代码:
class OrderQueryServiceIntegrationTest {
@Test
void givenOrdersAreUpdated_whenCallingOrderUpdates_thenUpdatesReturned() {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.schedule(this::addIncrementDecrementConfirmAndShip, 100L, TimeUnit.MILLISECONDS);
try {
StepVerifier.create(queryService.orderUpdates(orderId))
.assertNext(order -> assertTrue(order.getProducts().isEmpty()))
.assertNext(order -> assertEquals(1, order.getProducts().get(productId)))
.assertNext(order -> assertEquals(2, order.getProducts().get(productId)))
.assertNext(order -> assertEquals(1, order.getProducts().get(productId)))
.assertNext(order -> assertEquals(OrderStatusResponse.CONFIRMED, order.getOrderStatus()))
.assertNext(order -> assertEquals(OrderStatusResponse.SHIPPED, order.getOrderStatus()))
.thenCancel()
.verify();
} finally {
executor.shutdown();
}
}
private void addIncrementDecrementConfirmAndShip() {
sendProductAddedEvent();
sendProductCountIncrementEvent();
sendProductCountDecrement();
sendOrderConfirmedEvent();
sendOrderShippedEvent();
}
private void sendProductAddedEvent() {
ProductAddedEvent event = new ProductAddedEvent(orderId, productId);
eventGateway.publish(event);
}
private void sendProductCountIncrementEvent() {
ProductCountIncrementedEvent event = new ProductCountIncrementedEvent(orderId, productId);
eventGateway.publish(event);
}
private void sendProductCountDecrement() {
ProductCountDecrementedEvent event = new ProductCountDecrementedEvent(orderId, productId);
eventGateway.publish(event);
}
private void sendOrderConfirmedEvent() {
OrderConfirmedEvent event = new OrderConfirmedEvent(orderId);
eventGateway.publish(event);
}
private void sendOrderShippedEvent() {
OrderShippedEvent event = new OrderShippedEvent(orderId);
eventGateway.publish(event);
}
// 其他测试
}
测试中通过 ScheduledExecutorService
延迟 100ms 发布五个订单事件,模拟订阅开始后的更新。主线程使用 StepVerifier
验证每个更新步骤。
7. 总结
本文深入探讨了 Axon Framework 的三种查询分发方式:点对点查询、分散-聚集查询和订阅查询。每种模式适用于不同场景:
- 点对点查询:简单粗暴获取单一完整答案
- 流式查询:高效处理大型结果集
- 分散-聚集查询:合并多系统数据
- 订阅查询:实时数据推送
完整示例代码可在 GitHub 获取。实际应用中需根据业务场景选择合适的查询模式,避免踩坑。