2. 查询分发机制

当我们向 Axon 提交查询时,框架会将该查询分发给所有能处理此查询的已注册查询处理器。在分布式系统中,可能出现多个节点支持同类型查询的情况,也可能单个节点存在多个可处理该查询的处理器。

那么 Axon 如何决定返回哪些结果?答案取决于查询的分发方式。Axon 提供四种查询模式:

  • 点对点查询:从任意支持该查询的节点获取完整答案
  • 流式查询:从任意支持该查询的节点获取答案流
  • 分散-聚集查询:从所有支持该查询的节点获取完整答案
  • 订阅查询:获取当前答案后持续监听后续更新

接下来我们将深入探讨每种查询模式的实现细节。

3. 点对点查询

点对点查询模式下,Axon 会将查询发送给所有支持该查询的节点。Axon 假设任何节点都能提供完整答案,因此只返回最先响应节点的结果

本节我们将使用点对点查询获取系统中的所有订单。

3.1. 定义查询

Axon 使用强类型类表示查询类型并封装查询参数。由于我们要查询所有订单,不需要参数,因此使用空类:

public class FindAllOrderedProductsQuery {}

3.2. 定义查询处理器

使用 @QueryHandler 注解注册查询处理器

@Service
public class InMemoryOrdersEventHandler implements OrdersEventHandler {
    private final Map<String, Order> orders = new HashMap<>();

    @QueryHandler
    public List<Order> handle(FindAllOrderedProductsQuery query) {
        return new ArrayList<>(orders.values());
    }
}

上述代码中,handle() 方法被注册为 Axon 查询处理器:

  1. 能响应 FindAllOrderedProductsQuery 查询
  2. 返回 List<Order>Axon 会根据返回类型决定哪个处理器能处理特定查询,这有助于逐步迁移 API

这里使用 OrdersEventHandler 接口是为了方便后续切换到 MongoDB 等持久化存储实现。当前实现简单地将 Order 对象存储在内存 Map 中。

3.3. 分发点对点查询

现在我们定义好查询类型和处理器,准备向 Axon 分发查询。创建服务类:

@Service
public class OrderQueryService {
    private final QueryGateway queryGateway;

    public OrderQueryService(QueryGateway queryGateway) {
        this.queryGateway = queryGateway;
    }

    public CompletableFuture<List<OrderResponse>> findAllOrders() {
        return queryGateway.query(new FindAllOrderedProductsQuery(),
            ResponseTypes.multipleInstancesOf(Order.class))
          .thenApply(r -> r.stream()
            .map(OrderResponse::new)
            .collect(Collectors.toList()));
    }
}

代码中使用 Axon 的 QueryGateway 分发查询。通过指定 ResponseTypes.multipleInstancesOf(Order.class),Axon 知道我们只需要返回类型为 Order 集合的处理器。

为隔离内部 Order 模型与外部客户端,我们将结果包装为 OrderResponse 对象。

3.4. 测试点对点查询

使用 @SpringBootTest 进行集成测试。首先添加依赖:

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-test</artifactId>
    <scope>test</scope>
</dependency>

测试代码:

@SpringBootTest(classes = OrderApplication.class)
class OrderQueryServiceIntegrationTest {

    @Autowired
    OrderQueryService queryService;

    @Autowired
    OrdersEventHandler handler;

    private String orderId;

    @BeforeEach
    void setUp() {
        orderId = UUID.randomUUID().toString();
        Order order = new Order(orderId);
        handler.reset(Collections.singletonList(order));
    }

    @Test
    void givenOrderCreatedEventSend_whenCallingAllOrders_thenOneCreatedOrderIsReturned()
            throws ExecutionException, InterruptedException {
        List<OrderResponse> result = queryService.findAllOrders().get();
        assertEquals(1, result.size());
        OrderResponse response = result.get(0);
        assertEquals(orderId, response.getOrderId());
        assertEquals(OrderStatusResponse.CREATED, response.getOrderStatus());
        assertTrue(response.getProducts().isEmpty());
    }
}

@BeforeEach 方法调用 reset()OrdersEventHandler 中的辅助方法)预加载测试订单。测试中调用服务方法验证查询处理器是否正确返回测试订单。

4. 流式查询

流式查询允许以流的形式返回大型集合。与点对点查询不同,流式查询无需等待处理器端完成全部结果,而是分段返回结果。与订阅查询不同,流式查询最终会完成。

依赖 Project Reactor 的流式查询支持背压等特性处理大型结果集。需添加依赖:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>

4.1. 定义查询

复用点对点查询的 FindAllOrderedProductsQuery

4.2. 定义查询处理器

**流式查询应返回 Publisher**。使用 Reactor 将内存 Map 的值转换为 Mono

@QueryHandler
public Publisher<Order> handleStreaming(FindAllOrderedProductsQuery query) {
    return Mono.fromCallable(orders::values).flatMapMany(Flux::fromIterable);
}

使用 flatMapMany()Mono 转换为 Publisher

4.3. 分发流式查询

OrderQueryService 中添加方法(为区分点对点查询使用不同方法名):

public Flux<OrderResponse> allOrdersStreaming() {
    Publisher<Order> publisher = queryGateway.streamingQuery(new FindAllOrderedProductsQuery(), Order.class);
    return Flux.from(publisher).map(OrderResponse::new);
}

4.4. 测试流式查询

添加测试用例:

@Test
void givenOrderCreatedEventSend_whenCallingAllOrdersStreaming_thenOneOrderIsReturned() {
    Flux<OrderResponse> result = queryService.allOrdersStreaming();
    StepVerifier.create(result)
      .assertNext(order -> assertEquals(orderId, order.getOrderId()))
      .expectComplete()
      .verify();
}

通过 expectComplete() 验证流确实完成。

5. 分散-聚集查询

分散-聚集查询会分发给所有支持该查询的节点上的所有处理器。每个处理器的结果会被合并为单一响应。如果两个节点具有相同的应用名,Axon 视它们为等价节点,只使用最先响应节点的结果。

本节创建查询统计指定产品 ID 的已发货产品总数,模拟同时查询实时系统和遗留系统。

5.1. 定义查询

需要提供产品 ID 参数,创建包含参数的 POJO

public class TotalProductsShippedQuery {
    private final String productId;

    public TotalProductsShippedQuery(String productId) {
        this.productId = productId;
    }

    // getter
}

5.2. 定义查询处理器

首先查询基于事件的系统(内存存储)。在 InMemoryOrdersEventHandler 中添加处理器:

@QueryHandler
public Integer handle(TotalProductsShippedQuery query) {
    return orders.values().stream()
      .filter(o -> o.getOrderStatus() == OrderStatus.SHIPPED)
      .map(o -> Optional.ofNullable(o.getProducts().get(query.getProductId())).orElse(0))
      .reduce(0, Integer::sum);
}

代码过滤出已发货订单,统计匹配产品 ID 的发货数量并求和。

为模拟遗留系统数据,创建独立类和处理器:

@Service
public class LegacyQueryHandler {
    @QueryHandler
    public Integer handle(TotalProductsShippedQuery query) {
        switch (query.getProductId()) {
        case "Deluxe Chair":
            return 234;
        case "a6aa01eb-4e38-4dfb-b53b-b5b82961fbf3":
            return 10;
        default:
            return 0;
        }
    }
}

实际场景中,多个处理器通常分布在不同的 Spring 应用中。同一应用内通常不会为同一查询类型设置多个处理器

5.3. 分发分散-聚集查询

OrderQueryService 中添加方法:

public Integer totalShipped(String productId) {
    return queryGateway.scatterGather(new TotalProductsShippedQuery(productId),
        ResponseTypes.instanceOf(Integer.class), 10L, TimeUnit.SECONDS)
      .reduce(0, Integer::sum);
}

构造查询对象时传入 productId,设置 10 秒超时。Axon 只返回超时窗口内获取的结果,未及时响应的处理器结果将被忽略。

5.4. 测试分散-聚集查询

添加测试用例:

void givenThreeDeluxeChairsShipped_whenCallingAllShippedChairs_then234PlusTreeIsReturned() {
    Order order = new Order(orderId);
    order.getProducts().put("Deluxe Chair", 3);
    order.setOrderShipped();
    handler.reset(Collections.singletonList(order));

    assertEquals(237, queryService.totalShipped("Deluxe Chair"));
}

测试模拟事件驱动系统中有 3 个订单,遗留系统中硬编码 234 个豪华椅,验证合并结果为 237。

6. 订阅查询

订阅查询先返回初始结果,然后持续接收更新流。本节查询订单当前状态,并保持连接接收后续更新。

6.1. 定义查询

创建包含订单 ID 参数的查询类:

public class OrderUpdatesQuery {
    private final String orderId;

    public OrderUpdatesQuery(String orderId) {
        this.orderId = orderId;
    }

    // getter
}

6.2. 定义查询处理器

从内存 Map 获取订单的处理器很简单:

@QueryHandler
public Order handle(OrderUpdatesQuery query) {
    return orders.get(query.getOrderId());
}

6.3. 发出查询更新

订阅查询的价值在于接收更新。Axon 提供 QueryUpdateEmitter 控制订阅更新的时机和方式。将其注入 InMemoryOrdersEventHandler

@Service
public class InMemoryOrdersEventHandler implements OrdersEventHandler {

    private final QueryUpdateEmitter emitter;

    public InMemoryOrdersEventHandler(QueryUpdateEmitter emitter) {
        this.emitter = emitter;
    }

    private void emitUpdate(Order order) {
        emitter.emit(OrderUpdatesQuery.class, q -> order.getOrderId()
          .equals(q.getOrderId()), order);
    }

    // 事件和查询处理器
}

emitter.emit() 调用通知 Axon 订阅 OrderUpdatesQuery 的客户端可能需要更新。第二个参数是过滤器,确保只有匹配订单 ID 的订阅才会收到更新。

在修改订单的事件处理器中调用 emitUpdate()。例如订单发货时通知订阅者:

@Service
public class InMemoryOrdersEventHandler implements OrdersEventHandler {
    @EventHandler
    public void on(OrderShippedEvent event) {
        orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
            order.setOrderShipped();
            emitUpdate(order);
            return order;
        });
    }

    // 字段、查询处理器、其他事件处理器和 emitUpdate() 方法
}

ProductAddedEventProductCountIncrementedEvent 等事件做同样处理。

6.4. 订阅查询

使用 Reactor Core 的 Flux 构建订阅服务方法。添加依赖:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>

OrderQueryService 中实现:

public class OrderQueryService {
    public Flux<OrderResponse> orderUpdates(String orderId) {
        return subscriptionQuery(new OrderUpdatesQuery(orderId), ResponseTypes.instanceOf(Order.class))
                .map(OrderResponse::new);
    }

    private <Q, R> Flux<R> subscriptionQuery(Q query, ResponseType<R> resultType) {
        SubscriptionQueryResult<R, R> result = queryGateway.subscriptionQuery(query,
          resultType, resultType);
        return result.initialResult()
          .concatWith(result.updates())
          .doFinally(signal -> result.close());
    }

    // 其他服务方法
}

公共方法 orderUpdates() 委托给私有方法 subscriptionQuery(),并将结果包装为 OrderResponse

subscriptionQuery() 方法合并初始结果和后续更新:

  1. 调用 queryGateway.subscriptionQuery() 获取 SubscriptionQueryResult
  2. 使用 result.getInitialResult()result.getUpdates() 获取所需数据
  3. 最后关闭流

Axon 的 Reactive 扩展 提供了更便捷的订阅查询网关实现。

6.5. 测试订阅查询

使用 StepVerifier 测试返回 Flux 的方法。添加依赖:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

测试代码:

class OrderQueryServiceIntegrationTest {
    @Test
    void givenOrdersAreUpdated_whenCallingOrderUpdates_thenUpdatesReturned() {
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        executor.schedule(this::addIncrementDecrementConfirmAndShip, 100L, TimeUnit.MILLISECONDS);
        try {
            StepVerifier.create(queryService.orderUpdates(orderId))
              .assertNext(order -> assertTrue(order.getProducts().isEmpty()))
              .assertNext(order -> assertEquals(1, order.getProducts().get(productId)))
              .assertNext(order -> assertEquals(2, order.getProducts().get(productId)))
              .assertNext(order -> assertEquals(1, order.getProducts().get(productId)))
              .assertNext(order -> assertEquals(OrderStatusResponse.CONFIRMED, order.getOrderStatus()))
              .assertNext(order -> assertEquals(OrderStatusResponse.SHIPPED, order.getOrderStatus()))
              .thenCancel()
              .verify();
        } finally {
            executor.shutdown();
        }
    }

    private void addIncrementDecrementConfirmAndShip() {
        sendProductAddedEvent();
        sendProductCountIncrementEvent();
        sendProductCountDecrement();
        sendOrderConfirmedEvent();
        sendOrderShippedEvent();
    }

    private void sendProductAddedEvent() {
        ProductAddedEvent event = new ProductAddedEvent(orderId, productId);
        eventGateway.publish(event);
    }

    private void sendProductCountIncrementEvent() {
        ProductCountIncrementedEvent event = new ProductCountIncrementedEvent(orderId, productId);
        eventGateway.publish(event);
    }

    private void sendProductCountDecrement() {
        ProductCountDecrementedEvent event = new ProductCountDecrementedEvent(orderId, productId);
        eventGateway.publish(event);
    }

    private void sendOrderConfirmedEvent() {
        OrderConfirmedEvent event = new OrderConfirmedEvent(orderId);
        eventGateway.publish(event);
    }

    private void sendOrderShippedEvent() {
        OrderShippedEvent event = new OrderShippedEvent(orderId);
        eventGateway.publish(event);
    }

    // 其他测试
}

测试中通过 ScheduledExecutorService 延迟 100ms 发布五个订单事件,模拟订阅开始后的更新。主线程使用 StepVerifier 验证每个更新步骤。

7. 总结

本文深入探讨了 Axon Framework 的三种查询分发方式:点对点查询、分散-聚集查询和订阅查询。每种模式适用于不同场景:

  • 点对点查询:简单粗暴获取单一完整答案
  • 流式查询:高效处理大型结果集
  • 分散-聚集查询:合并多系统数据
  • 订阅查询:实时数据推送

完整示例代码可在 GitHub 获取。实际应用中需根据业务场景选择合适的查询模式,避免踩坑。


原始标题:Dispatching Queries in Axon Framework