1. 概述

Axon框架帮助我们构建事件驱动的微服务系统。在Axon框架入门指南中,我们通过一个简单的Spring Boot应用学习了Axon,该应用包含构建和查询订单模型。在Axon框架查询分发中,我们添加了所有支持的查询类型。

本文将深入探讨Axon框架查询模型的持久化。我们将介绍使用MongoDB存储投影(projection)的方案,以及测试挑战和如何保持事件流与查询模型的同步。

2. 持久化考量

要创建使用数据库持久化查询模型的处理器,我们需要实现OrdersEventHandler接口。在生产环境中,我们不希望每次都从头构建查询模型。使用Axon框架时,我们可以自由选择持久化方式,具体选择取决于数据特性:

  • ✅ 需要全文搜索?考虑Elasticsearch
  • ✅ 处理非结构化数据?MongoDB是不错选择
  • ✅ 实体间存在复杂关系?图数据库如Neo4J更合适

2.1. 令牌存储

当通过事件构建查询模型时,Axon使用TokenStore跟踪处理进度。理想情况下,令牌存储应与查询模型位于同一数据库以确保一致性。使用持久化令牌存储还能支持多实例部署,每个实例只需处理部分事件。通过分段机制,实例可以申领全部或部分分段进行处理。如果使用JPAJDBC持久化,可直接使用框架内置的*JpaTokenStore*或JdbcTokenStore

2.2. 构建查询模型

启动时,流式事件处理器会从事件存储开始读取事件。使用持久化TokenStore时,处理器会从上次中断处继续;否则默认从头开始。对于每个事件,处理器会调用带有@EventHandler注解的方法。

继续扩展订单应用,支持多种订单创建和更新方式。在InMemoryOrdersEventHandler中处理ProductAddedEvent的示例:

@EventHandler
public void on(ProductAddedEvent event) {
    orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
      order.addProduct(event.getProductId());
      return order;
    });
}

这里通过addProduct函数更新内存中的订单。我们可以轻松替换为数据库存储实现。

3. MongoDB扩展

使用MongoDB持久化查询模型。我们借助Axon框架Mongo扩展将令牌存储也存入Mongo。由于已添加*axon-bom*,在pom.xml中添加扩展时无需指定版本:

<dependency>
    <groupId>org.axonframework.extensions.mongo</groupId>
    <artifactId>axon-mongo</artifactId>
</dependency>

3.1. 令牌存储

添加依赖后,配置Axon使用MongoTokenStore

@Bean
public TokenStore getTokenStore(MongoClient client, Serializer serializer){
    return MongoTokenStore.builder()
      .mongoTemplate(
        DefaultMongoTemplate.builder()
          .mongoDatabase(client)
          .build()
      )
      .serializer(serializer)
      .build();
}

3.2. 事件处理器类

通过名为mongoSpring Profile切换事件处理器实现。激活mongo profile时,使用MongoOrdersEventHandler及配套的令牌存储配置。完整的事件处理器类如下:

@Service
@ProcessingGroup("orders")
@Profile("mongo")
public class MongoOrdersEventHandler implements OrdersEventHandler {
    // 更新和查询投影的所有方法
}

同时在InMemoryOrdersEventHandler上添加@Profile("!mongo"),确保两者不会同时激活。Spring profiles是条件化启用组件的绝佳方式。

通过构造函数注入获取MongoClientQueryUpdateEmitter。使用MongoClient创建Mongo集合和索引,注入QueryUpdateEmitter以支持订阅查询

public MongoOrdersEventHandler(MongoClient client, QueryUpdateEmitter emitter) {
    orders = client
      .getDatabase(AXON_FRAMEWORK_DATABASE_NAME)
      .getCollection(ORDER_COLLECTION_NAME);
    orders.createIndex(Indexes.ascending(ORDER_ID_PROPERTY_NAME),
      new IndexOptions().unique(true));
    this.emitter = emitter;
}

⚠️ 注意:我们将订单ID设为唯一索引,确保不会出现重复ID的文档。

MongoOrdersEventHandler使用orders集合处理查询。通过documentToOrder()方法将Mongo文档映射为订单对象:

@QueryHandler
public List<Order> handle(FindAllOrderedProductsQuery query) {
    List<Order> orderList = new ArrayList<>();
    orders
      .find()
      .forEach(d -> orderList.add(documentToOrder(d)));
    return orderList;
}

3.3. 复杂查询

为处理TotalProductsShippedQuery,添加shippedProductFilter过滤器,筛选出已发货且包含指定产品的订单:

private Bson shippedProductFilter(String productId){
    return and(
      eq(ORDER_STATUS_PROPERTY_NAME, OrderStatus.SHIPPED.toString()),
      exists(String.format(PRODUCTS_PROPERTY_NAME + ".%s", productId))
    );
}

在查询处理器中使用该过滤器提取并统计产品数量:

@QueryHandler
public Integer handle(TotalProductsShippedQuery query) {
    AtomicInteger result = new AtomicInteger();
    orders
      .find(shippedProductFilter(query.getProductId()))
      .map(d -> d.get(PRODUCTS_PROPERTY_NAME, Document.class))
      .map(d -> d.getInteger(query.getProductId(), 0))
      .forEach(result::addAndGet);
    return result.get();
}

该查询获取所有已发货订单,提取其中的产品信息,统计指定产品的总数并返回。

4. 测试持久化查询模型

使用持久化模型测试会增加复杂度,因为我们需要为每个测试提供隔离环境

4.1. 单元测试

对于MongoOrdersEventHandler必须清空集合以避免测试间状态污染。通过实现getHandler()方法实现:

@Override
protected OrdersEventHandler getHandler() {
    mongoClient.getDatabase("axonframework").drop();
    return new MongoOrdersEventHandler(mongoClient, emitter);
}

使用@BeforeEach注解确保每个测试都从干净状态开始。测试中我们使用嵌入式Mongo,也可选择test containers。本质上,查询模型测试与其他需要数据库的应用测试没有区别。

4.2. 集成测试

集成测试采用类似方法,但由于测试使用OrdersEventHandler接口,我们依赖实现的reset()方法

reset()方法实现如下:

@Override
public void reset(List<Order> orderList) {
    orders.deleteMany(new Document());
    orderList.forEach(o -> orders.insertOne(orderToDocument(o)));
}

reset()方法确保集合中仅包含指定列表中的订单。在OrderQueryServiceIntegrationTest的每个测试前执行:

@BeforeEach
void setUp() {
    orderId = UUID.randomUUID().toString();
    Order order = new Order(orderId);
    handler.reset(Collections.singletonList(order));
}

为简化测试逻辑,我们预先存储一个订单作为测试基础数据。

5. 总结

本文展示了如何持久化查询模型,并介绍了使用MongoDB进行查询和测试的方法。关键点包括:

  • ✅ 根据数据特性选择合适的持久化方案
  • ✅ 使用MongoDB扩展简化令牌存储配置
  • ✅ 通过Spring Profile灵活切换实现
  • ❌ 测试时注意隔离环境,避免状态污染

完整代码示例可在GitHub仓库获取。踩坑提示:生产环境中务必配置持久化令牌存储,否则每次重启都会重放所有事件!


原始标题:Persisting the Query Model