1. 概述
Axon框架帮助我们构建事件驱动的微服务系统。在Axon框架入门指南中,我们通过一个简单的Spring Boot应用学习了Axon,该应用包含构建和查询订单模型。在Axon框架查询分发中,我们添加了所有支持的查询类型。
本文将深入探讨Axon框架查询模型的持久化。我们将介绍使用MongoDB存储投影(projection)的方案,以及测试挑战和如何保持事件流与查询模型的同步。
2. 持久化考量
要创建使用数据库持久化查询模型的处理器,我们需要实现OrdersEventHandler
接口。在生产环境中,我们不希望每次都从头构建查询模型。使用Axon框架时,我们可以自由选择持久化方式,具体选择取决于数据特性:
- ✅ 需要全文搜索?考虑Elasticsearch
- ✅ 处理非结构化数据?MongoDB是不错选择
- ✅ 实体间存在复杂关系?图数据库如Neo4J更合适
2.1. 令牌存储
当通过事件构建查询模型时,Axon使用TokenStore跟踪处理进度。理想情况下,令牌存储应与查询模型位于同一数据库以确保一致性。使用持久化令牌存储还能支持多实例部署,每个实例只需处理部分事件。通过分段机制,实例可以申领全部或部分分段进行处理。如果使用JPA或JDBC持久化,可直接使用框架内置的*JpaTokenStore*或JdbcTokenStore。
2.2. 构建查询模型
启动时,流式事件处理器会从事件存储开始读取事件。使用持久化TokenStore时,处理器会从上次中断处继续;否则默认从头开始。对于每个事件,处理器会调用带有@EventHandler
注解的方法。
继续扩展订单应用,支持多种订单创建和更新方式。在InMemoryOrdersEventHandler
中处理ProductAddedEvent
的示例:
@EventHandler
public void on(ProductAddedEvent event) {
orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
order.addProduct(event.getProductId());
return order;
});
}
这里通过addProduct
函数更新内存中的订单。我们可以轻松替换为数据库存储实现。
3. MongoDB扩展
使用MongoDB持久化查询模型。我们借助Axon框架Mongo扩展将令牌存储也存入Mongo。由于已添加*axon-bom*,在pom.xml
中添加扩展时无需指定版本:
<dependency>
<groupId>org.axonframework.extensions.mongo</groupId>
<artifactId>axon-mongo</artifactId>
</dependency>
3.1. 令牌存储
添加依赖后,配置Axon使用MongoTokenStore:
@Bean
public TokenStore getTokenStore(MongoClient client, Serializer serializer){
return MongoTokenStore.builder()
.mongoTemplate(
DefaultMongoTemplate.builder()
.mongoDatabase(client)
.build()
)
.serializer(serializer)
.build();
}
3.2. 事件处理器类
通过名为mongo的Spring Profile切换事件处理器实现。激活mongo profile时,使用MongoOrdersEventHandler
及配套的令牌存储配置。完整的事件处理器类如下:
@Service
@ProcessingGroup("orders")
@Profile("mongo")
public class MongoOrdersEventHandler implements OrdersEventHandler {
// 更新和查询投影的所有方法
}
同时在InMemoryOrdersEventHandler
上添加@Profile("!mongo")
,确保两者不会同时激活。Spring profiles是条件化启用组件的绝佳方式。
通过构造函数注入获取MongoClient
和QueryUpdateEmitter
。使用MongoClient
创建Mongo集合和索引,注入QueryUpdateEmitter
以支持订阅查询:
public MongoOrdersEventHandler(MongoClient client, QueryUpdateEmitter emitter) {
orders = client
.getDatabase(AXON_FRAMEWORK_DATABASE_NAME)
.getCollection(ORDER_COLLECTION_NAME);
orders.createIndex(Indexes.ascending(ORDER_ID_PROPERTY_NAME),
new IndexOptions().unique(true));
this.emitter = emitter;
}
⚠️ 注意:我们将订单ID设为唯一索引,确保不会出现重复ID的文档。
MongoOrdersEventHandler
使用orders集合处理查询。通过documentToOrder()
方法将Mongo文档映射为订单对象:
@QueryHandler
public List<Order> handle(FindAllOrderedProductsQuery query) {
List<Order> orderList = new ArrayList<>();
orders
.find()
.forEach(d -> orderList.add(documentToOrder(d)));
return orderList;
}
3.3. 复杂查询
为处理TotalProductsShippedQuery
,添加shippedProductFilter过滤器,筛选出已发货且包含指定产品的订单:
private Bson shippedProductFilter(String productId){
return and(
eq(ORDER_STATUS_PROPERTY_NAME, OrderStatus.SHIPPED.toString()),
exists(String.format(PRODUCTS_PROPERTY_NAME + ".%s", productId))
);
}
在查询处理器中使用该过滤器提取并统计产品数量:
@QueryHandler
public Integer handle(TotalProductsShippedQuery query) {
AtomicInteger result = new AtomicInteger();
orders
.find(shippedProductFilter(query.getProductId()))
.map(d -> d.get(PRODUCTS_PROPERTY_NAME, Document.class))
.map(d -> d.getInteger(query.getProductId(), 0))
.forEach(result::addAndGet);
return result.get();
}
该查询获取所有已发货订单,提取其中的产品信息,统计指定产品的总数并返回。
4. 测试持久化查询模型
使用持久化模型测试会增加复杂度,因为我们需要为每个测试提供隔离环境。
4.1. 单元测试
对于MongoOrdersEventHandler
,必须清空集合以避免测试间状态污染。通过实现getHandler()
方法实现:
@Override
protected OrdersEventHandler getHandler() {
mongoClient.getDatabase("axonframework").drop();
return new MongoOrdersEventHandler(mongoClient, emitter);
}
使用@BeforeEach
注解确保每个测试都从干净状态开始。测试中我们使用嵌入式Mongo,也可选择test containers。本质上,查询模型测试与其他需要数据库的应用测试没有区别。
4.2. 集成测试
集成测试采用类似方法,但由于测试使用OrdersEventHandler
接口,我们依赖实现的reset()
方法。
reset()
方法实现如下:
@Override
public void reset(List<Order> orderList) {
orders.deleteMany(new Document());
orderList.forEach(o -> orders.insertOne(orderToDocument(o)));
}
reset()
方法确保集合中仅包含指定列表中的订单。在OrderQueryServiceIntegrationTest
的每个测试前执行:
@BeforeEach
void setUp() {
orderId = UUID.randomUUID().toString();
Order order = new Order(orderId);
handler.reset(Collections.singletonList(order));
}
为简化测试逻辑,我们预先存储一个订单作为测试基础数据。
5. 总结
本文展示了如何持久化查询模型,并介绍了使用MongoDB进行查询和测试的方法。关键点包括:
- ✅ 根据数据特性选择合适的持久化方案
- ✅ 使用MongoDB扩展简化令牌存储配置
- ✅ 通过Spring Profile灵活切换实现
- ❌ 测试时注意隔离环境,避免状态污染
完整代码示例可在GitHub仓库获取。踩坑提示:生产环境中务必配置持久化令牌存储,否则每次重启都会重放所有事件!