1. 概述
本文将探讨Axon框架,并展示它如何帮助我们实现基于CQRS(命令查询职责分离)和事件溯源的应用。
我们将同时使用Axon Framework和Axon Server。前者包含我们的实现代码,后者则作为专用的事件存储和消息路由解决方案。
示例应用聚焦于订单(Order)领域。我们将充分利用Axon提供的CQRS和事件溯源构建块。
注意:许多共享概念直接源自DDD(领域驱动设计),但这超出了本文范围。
2. Maven依赖
创建一个Axon/Spring Boot应用,需要在pom.xml
中添加最新的axon-spring-boot-starter依赖,以及用于测试的axon-test依赖。为确保版本一致,在依赖管理部分添加axon-bom:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-bom</artifactId>
<version>4.9.3</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
3. Axon Server
我们将使用Axon Server作为事件存储和专用的命令、事件及查询路由解决方案。
作为事件存储,它提供了存储事件所需的理想特性。这篇文章解释了为什么这很重要。
作为消息路由解决方案,它允许我们连接多个实例,而无需配置RabbitMQ或Kafka主题来共享和分发消息。
Axon Server可从此处下载。它是一个简单的JAR文件,通过以下命令启动:
java -jar axonserver.jar
这将启动一个可通过localhost:8024访问的Axon Server实例。该接口提供已连接应用及其可处理消息的概览,以及查询Axon Server内事件存储的机制。
Axon Server的默认配置配合axon-spring-boot-starter
依赖,将确保我们的订单服务自动连接到它。
4. 订单服务API – 命令
我们将以CQRS思想构建订单服务,因此重点在于流经应用的消息。
首先定义命令(Commands),即意图的表达式。订单服务可处理三种操作:
- 创建新订单
- 确认订单
- 发货订单
自然地,领域需要处理三个命令消息:CreateOrderCommand
、ConfirmOrderCommand
和ShipOrderCommand
:
public class CreateOrderCommand {
@TargetAggregateIdentifier
private final String orderId;
private final String productId;
// 构造方法、getter、equals/hashCode和toString方法
}
public class ConfirmOrderCommand {
@TargetAggregateIdentifier
private final String orderId;
// 构造方法、getter、equals/hashCode和toString方法
}
public class ShipOrderCommand {
@TargetAggregateIdentifier
private final String orderId;
// 构造方法、getter、equals/hashCode和toString方法
}
@TargetAggregateIdentifier
注解告诉Axon:标注字段是目标聚合根的ID,命令应路由至该聚合根。稍后我们会简要介绍聚合根。
注意:命令中的字段标记为final
是故意的,这是所有消息实现的最佳实践——保持不可变性。
5. 订单服务API – 事件
聚合根将处理命令,因为它负责决定订单能否被创建、确认或发货。
它通过发布事件通知应用其余部分其决策。我们定义三种事件:OrderCreatedEvent
、OrderConfirmedEvent
和OrderShippedEvent
:
public class OrderCreatedEvent {
private final String orderId;
private final String productId;
// 默认构造方法、getter、equals/hashCode和toString方法
}
public class OrderConfirmedEvent {
private final String orderId;
// 默认构造方法、getter、equals/hashCode和toString方法
}
public class OrderShippedEvent {
private final String orderId;
// 默认构造方法、getter、equals/hashCode和toString方法
}
6. 命令模型 – 订单聚合根
既然已用命令和事件建模了核心API,现在可以开始构建命令模型。
聚合根是命令模型中的常规组件,源自DDD。其他框架也使用此概念,如这篇文章所示。
由于领域聚焦于订单处理,我们将创建OrderAggregate
作为命令模型的核心。
6.1. 聚合根类
创建基础聚合根类:
@Aggregate
public class OrderAggregate {
@AggregateIdentifier
private String orderId;
private boolean orderConfirmed;
@CommandHandler
public OrderAggregate(CreateOrderCommand command) {
AggregateLifecycle.apply(new OrderCreatedEvent(command.getOrderId(), command.getProductId()));
}
@EventSourcingHandler
public void on(OrderCreatedEvent event) {
this.orderId = event.getOrderId();
orderConfirmed = false;
}
protected OrderAggregate() { }
}
@Aggregate
注解是Axon Spring特有的,标记此类为聚合根。它将通知框架:需要为该OrderAggregate
实例化CQRS和事件溯源所需的构建块。
由于聚合根处理针对特定实例的命令,需用@AggregateIdentifier
注解指定标识符。
聚合根在处理OrderAggregate
的"命令处理构造函数"中的CreateOrderCommand
时启动生命周期。为告知框架某方法可处理命令,我们添加@CommandHandler
注解。
处理CreateOrderCommand
时,通过发布OrderCreatedEvent
通知应用其余部分订单已创建。要从聚合根内发布事件,使用AggregateLifecycle#apply(Object...)
。
现在可以开始引入事件溯源:基于事件流重建聚合根实例。
从"聚合创建事件"OrderCreatedEvent
开始,由@EventSourcingHandler
注解的方法处理,以设置订单聚合根的orderId
和orderConfirmed
状态。
注意:Axon要求存在默认构造函数,以便基于事件重建聚合根。
6.2. 聚合根命令处理器
现在基础聚合根已就绪,实现剩余命令处理器:
@CommandHandler
public void handle(ConfirmOrderCommand command) {
if (orderConfirmed) {
return;
}
apply(new OrderConfirmedEvent(orderId));
}
@CommandHandler
public void handle(ShipOrderCommand command) {
if (!orderConfirmed) {
throw new UnconfirmedOrderException();
}
apply(new OrderShippedEvent(orderId));
}
@EventSourcingHandler
public void on(OrderConfirmedEvent event) {
orderConfirmed = true;
}
命令和事件处理器的签名简化为handle({命令})
和on({事件})
以保持简洁。
此外,我们定义了订单只能确认一次,且仅在确认后才能发货。因此前者忽略命令,后者未满足条件时抛出UnconfirmedOrderException
。
这体现了OrderConfirmedEvent
事件处理器更新订单聚合根orderConfirmed
状态为true
的必要性。
7. 测试命令模型
首先,为OrderAggregate
创建测试用的FixtureConfiguration
:
private FixtureConfiguration<OrderAggregate> fixture;
@Before
public void setUp() {
fixture = new AggregateTestFixture<>(OrderAggregate.class);
}
第一个测试用例覆盖最简单场景:聚合处理CreateOrderCommand
时应产生OrderCreatedEvent
:
String orderId = UUID.randomUUID().toString();
String productId = "Deluxe Chair";
fixture.givenNoPriorActivity()
.when(new CreateOrderCommand(orderId, productId))
.expectEvents(new OrderCreatedEvent(orderId, productId));
接下来测试"仅确认后才能发货"的决策逻辑。这产生两种场景:预期抛异常,或预期产生OrderShippedEvent
。
先看预期异常的场景:
String orderId = UUID.randomUUID().toString();
String productId = "Deluxe Chair";
fixture.given(new OrderCreatedEvent(orderId, productId))
.when(new ShipOrderCommand(orderId))
.expectException(UnconfirmedOrderException.class);
再看预期产生OrderShippedEvent
的场景:
String orderId = UUID.randomUUID().toString();
String productId = "Deluxe Chair";
fixture.given(new OrderCreatedEvent(orderId, productId), new OrderConfirmedEvent(orderId))
.when(new ShipOrderCommand(orderId))
.expectEvents(new OrderShippedEvent(orderId));
8. 查询模型 – 事件处理器
目前,我们已通过命令和事件建立了核心API,并实现了CQRS订单服务的命令模型OrderAggregate
。
接下来,考虑应用应提供的查询模型之一。
其中之一是订单(Order)模型:
public class Order {
private final String orderId;
private final String productId;
private OrderStatus orderStatus;
public Order(String orderId, String productId) {
this.orderId = orderId;
this.productId = productId;
orderStatus = OrderStatus.CREATED;
}
public void setOrderConfirmed() {
this.orderStatus = OrderStatus.CONFIRMED;
}
public void setOrderShipped() {
this.orderStatus = OrderStatus.SHIPPED;
}
// getter、equals/hashCode和toString方法
}
public enum OrderStatus {
CREATED, CONFIRMED, SHIPPED
}
我们将基于系统传播的事件更新此模型。使用Spring Service bean更新模型即可:
@Service
public class OrdersEventHandler {
private final Map<String, Order> orders = new HashMap<>();
@EventHandler
public void on(OrderCreatedEvent event) {
String orderId = event.getOrderId();
orders.put(orderId, new Order(orderId, event.getProductId()));
}
// OrderConfirmedEvent和OrderShippedEvent的事件处理器...
}
由于使用了axon-spring-boot-starter
初始化Axon应用,框架会自动扫描所有bean的消息处理函数。
OrdersEventHandler
包含@EventHandler
注解的方法来存储和更新Order
,框架会自动将其注册为事件接收类,无需额外配置。
9. 查询模型 – 查询处理器
接下来,为查询模型(如检索所有订单)引入查询消息:
public class FindAllOrderedProductsQuery { }
其次,更新OrdersEventHandler
以处理FindAllOrderedProductsQuery
:
@QueryHandler
public List<Order> handle(FindAllOrderedProductsQuery query) {
return new ArrayList<>(orders.values());
}
@QueryHandler
注解的方法将处理FindAllOrderedProductsQuery
,并返回List<Order>
,与典型的"查找所有"查询一致。
10. 整合所有组件
我们已通过命令、事件和查询完善了核心API,并通过OrderAggregate
和Order
模型建立了命令和查询模型。
接下来整合基础设施。由于使用axon-spring-boot-starter
,许多必需配置已自动完成。
首先,为聚合根启用事件溯源,需要EventStore。第三步启动的Axon Server正好满足此需求。
其次,需要存储Order
查询模型的机制。本例添加h2内存数据库和spring-boot-starter-data-jpa简化操作:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
10.1. 设置REST接口
通过添加spring-boot-starter-web依赖暴露REST接口:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
通过REST接口分发命令和查询:
@RestController
public class OrderRestEndpoint {
private final CommandGateway commandGateway;
private final QueryGateway queryGateway;
// 自动装配的构造方法和POST/GET接口
}
CommandGateway用于发送命令消息,QueryGateway用于发送查询消息。相比它们连接的CommandBus和QueryBus,网关提供了更简洁的API。
OrderRestEndpoint
应包含创建、确认和发货订单的POST接口:
@PostMapping("/ship-order")
public CompletableFuture<Void> shipOrder() {
String orderId = UUID.randomUUID().toString();
return commandGateway.send(new CreateOrderCommand(orderId, "Deluxe Chair"))
.thenCompose(result -> commandGateway.send(new ConfirmOrderCommand(orderId)))
.thenCompose(result -> commandGateway.send(new ShipOrderCommand(orderId)));
}
这完成了CQRS应用的命令端。注意网关返回CompletableFuture
,支持异步操作。
最后是查询所有订单的GET接口:
@GetMapping("/all-orders")
public CompletableFuture<List<Order>> findAllOrders() {
return queryGateway.query(new FindAllOrderedProductsQuery(), ResponseTypes.multipleInstancesOf(Order.class));
}
GET接口中,我们使用QueryGateway
分发点对点查询。创建默认的FindAllOrderedProductsQuery
时,需指定预期返回类型。
由于预期返回多个Order
实例,使用静态方法ResponseTypes#multipleInstancesOf(Class)
。至此,我们为订单服务的查询端提供了基础入口。
所有组件已就绪,启动OrderApplication
后,可通过REST控制器发送命令和查询。
POST请求到/ship-order
接口会实例化OrderAggregate
并发布事件,进而保存/更新Order
。GET请求到/all-orders
接口会发布查询消息,由OrdersEventHandler
处理并返回所有现有订单。
11. 结论
本文介绍了Axon框架作为构建CQRS和事件溯源应用的强大基础。
我们使用框架实现了一个简单的订单服务,展示了此类应用的实际结构。
最后,Axon Server作为事件存储和消息路由机制,极大简化了基础设施。
更多相关问题,请参考Discuss AxonIQ。