1. 概述

本文将探讨Axon框架,并展示它如何帮助我们实现基于CQRS(命令查询职责分离)和事件溯源的应用。

我们将同时使用Axon Framework和Axon Server。前者包含我们的实现代码,后者则作为专用的事件存储和消息路由解决方案。

示例应用聚焦于订单(Order)领域。我们将充分利用Axon提供的CQRS和事件溯源构建块

注意:许多共享概念直接源自DDD(领域驱动设计),但这超出了本文范围。

2. Maven依赖

创建一个Axon/Spring Boot应用,需要在pom.xml中添加最新的axon-spring-boot-starter依赖,以及用于测试的axon-test依赖。为确保版本一致,在依赖管理部分添加axon-bom

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.axonframework</groupId>
            <artifactId>axon-bom</artifactId>
            <version>4.9.3</version>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>org.axonframework</groupId>
        <artifactId>axon-spring-boot-starter</artifactId>
    </dependency>

    <dependency>
        <groupId>org.axonframework</groupId>
        <artifactId>axon-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

3. Axon Server

我们将使用Axon Server作为事件存储和专用的命令、事件及查询路由解决方案。

作为事件存储,它提供了存储事件所需的理想特性。这篇文章解释了为什么这很重要。

作为消息路由解决方案,它允许我们连接多个实例,而无需配置RabbitMQ或Kafka主题来共享和分发消息。

Axon Server可从此处下载。它是一个简单的JAR文件,通过以下命令启动:

java -jar axonserver.jar

这将启动一个可通过localhost:8024访问的Axon Server实例。该接口提供已连接应用及其可处理消息的概览,以及查询Axon Server内事件存储的机制。

Axon Server的默认配置配合axon-spring-boot-starter依赖,将确保我们的订单服务自动连接到它。

4. 订单服务API – 命令

我们将以CQRS思想构建订单服务,因此重点在于流经应用的消息。

首先定义命令(Commands),即意图的表达式。订单服务可处理三种操作:

  1. 创建新订单
  2. 确认订单
  3. 发货订单

自然地,领域需要处理三个命令消息:CreateOrderCommandConfirmOrderCommandShipOrderCommand

public class CreateOrderCommand {
 
    @TargetAggregateIdentifier
    private final String orderId;
    private final String productId;
 
    // 构造方法、getter、equals/hashCode和toString方法
}
public class ConfirmOrderCommand {
 
    @TargetAggregateIdentifier
    private final String orderId;
    
    // 构造方法、getter、equals/hashCode和toString方法
}
public class ShipOrderCommand {
 
    @TargetAggregateIdentifier
    private final String orderId;
 
    // 构造方法、getter、equals/hashCode和toString方法
}

@TargetAggregateIdentifier注解告诉Axon:标注字段是目标聚合根的ID,命令应路由至该聚合根。稍后我们会简要介绍聚合根。

注意:命令中的字段标记为final是故意的,这是所有消息实现的最佳实践——保持不可变性

5. 订单服务API – 事件

聚合根将处理命令,因为它负责决定订单能否被创建、确认或发货。

它通过发布事件通知应用其余部分其决策。我们定义三种事件:OrderCreatedEventOrderConfirmedEventOrderShippedEvent

public class OrderCreatedEvent {
 
    private final String orderId;
    private final String productId;
 
    // 默认构造方法、getter、equals/hashCode和toString方法
}
public class OrderConfirmedEvent {
 
    private final String orderId;
 
    // 默认构造方法、getter、equals/hashCode和toString方法
}
public class OrderShippedEvent { 

    private final String orderId; 

    // 默认构造方法、getter、equals/hashCode和toString方法 
}

6. 命令模型 – 订单聚合根

既然已用命令和事件建模了核心API,现在可以开始构建命令模型。

聚合根是命令模型中的常规组件,源自DDD。其他框架也使用此概念,如这篇文章所示。

由于领域聚焦于订单处理,我们将创建OrderAggregate作为命令模型的核心

6.1. 聚合根类

创建基础聚合根类:

@Aggregate
public class OrderAggregate {

    @AggregateIdentifier
    private String orderId;
    private boolean orderConfirmed;

    @CommandHandler
    public OrderAggregate(CreateOrderCommand command) {
        AggregateLifecycle.apply(new OrderCreatedEvent(command.getOrderId(), command.getProductId()));
    }

    @EventSourcingHandler
    public void on(OrderCreatedEvent event) {
        this.orderId = event.getOrderId();
        orderConfirmed = false;
    }

    protected OrderAggregate() { }
}

@Aggregate注解是Axon Spring特有的,标记此类为聚合根。它将通知框架:需要为该OrderAggregate实例化CQRS和事件溯源所需的构建块。

由于聚合根处理针对特定实例的命令,需用@AggregateIdentifier注解指定标识符。

聚合根在处理OrderAggregate的"命令处理构造函数"中的CreateOrderCommand时启动生命周期。为告知框架某方法可处理命令,我们添加@CommandHandler注解。

处理CreateOrderCommand时,通过发布OrderCreatedEvent通知应用其余部分订单已创建。要从聚合根内发布事件,使用AggregateLifecycle#apply(Object...)

现在可以开始引入事件溯源:基于事件流重建聚合根实例。

从"聚合创建事件"OrderCreatedEvent开始,由@EventSourcingHandler注解的方法处理,以设置订单聚合根的orderIdorderConfirmed状态。

注意:Axon要求存在默认构造函数,以便基于事件重建聚合根。

6.2. 聚合根命令处理器

现在基础聚合根已就绪,实现剩余命令处理器:

@CommandHandler 
public void handle(ConfirmOrderCommand command) { 
    if (orderConfirmed) {
        return;
    }
    apply(new OrderConfirmedEvent(orderId)); 
} 

@CommandHandler 
public void handle(ShipOrderCommand command) { 
    if (!orderConfirmed) { 
        throw new UnconfirmedOrderException(); 
    } 
    apply(new OrderShippedEvent(orderId)); 
} 

@EventSourcingHandler 
public void on(OrderConfirmedEvent event) { 
    orderConfirmed = true; 
}

命令和事件处理器的签名简化为handle({命令})on({事件})以保持简洁。

此外,我们定义了订单只能确认一次,且仅在确认后才能发货。因此前者忽略命令,后者未满足条件时抛出UnconfirmedOrderException

这体现了OrderConfirmedEvent事件处理器更新订单聚合根orderConfirmed状态为true的必要性。

7. 测试命令模型

首先,为OrderAggregate创建测试用的FixtureConfiguration

private FixtureConfiguration<OrderAggregate> fixture;

@Before
public void setUp() {
    fixture = new AggregateTestFixture<>(OrderAggregate.class);
}

第一个测试用例覆盖最简单场景:聚合处理CreateOrderCommand时应产生OrderCreatedEvent

String orderId = UUID.randomUUID().toString();
String productId = "Deluxe Chair";
fixture.givenNoPriorActivity()
  .when(new CreateOrderCommand(orderId, productId))
  .expectEvents(new OrderCreatedEvent(orderId, productId));

接下来测试"仅确认后才能发货"的决策逻辑。这产生两种场景:预期抛异常,或预期产生OrderShippedEvent

先看预期异常的场景:

String orderId = UUID.randomUUID().toString();
String productId = "Deluxe Chair";
fixture.given(new OrderCreatedEvent(orderId, productId))
  .when(new ShipOrderCommand(orderId))
  .expectException(UnconfirmedOrderException.class);

再看预期产生OrderShippedEvent的场景:

String orderId = UUID.randomUUID().toString();
String productId = "Deluxe Chair";
fixture.given(new OrderCreatedEvent(orderId, productId), new OrderConfirmedEvent(orderId))
  .when(new ShipOrderCommand(orderId))
  .expectEvents(new OrderShippedEvent(orderId));

8. 查询模型 – 事件处理器

目前,我们已通过命令和事件建立了核心API,并实现了CQRS订单服务的命令模型OrderAggregate

接下来,考虑应用应提供的查询模型之一

其中之一是订单(Order)模型:

public class Order {

    private final String orderId;
    private final String productId;
    private OrderStatus orderStatus;

    public Order(String orderId, String productId) {
        this.orderId = orderId;
        this.productId = productId;
        orderStatus = OrderStatus.CREATED;
    }

    public void setOrderConfirmed() {
        this.orderStatus = OrderStatus.CONFIRMED;
    }

    public void setOrderShipped() {
        this.orderStatus = OrderStatus.SHIPPED;
    }

    // getter、equals/hashCode和toString方法
}
public enum OrderStatus {
    CREATED, CONFIRMED, SHIPPED
}

我们将基于系统传播的事件更新此模型。使用Spring Service bean更新模型即可:

@Service
public class OrdersEventHandler {

    private final Map<String, Order> orders = new HashMap<>();

    @EventHandler
    public void on(OrderCreatedEvent event) {
        String orderId = event.getOrderId();
        orders.put(orderId, new Order(orderId, event.getProductId()));
    }

    // OrderConfirmedEvent和OrderShippedEvent的事件处理器...
}

由于使用了axon-spring-boot-starter初始化Axon应用,框架会自动扫描所有bean的消息处理函数。

OrdersEventHandler包含@EventHandler注解的方法来存储和更新Order,框架会自动将其注册为事件接收类,无需额外配置。

9. 查询模型 – 查询处理器

接下来,为查询模型(如检索所有订单)引入查询消息:

public class FindAllOrderedProductsQuery { }

其次,更新OrdersEventHandler以处理FindAllOrderedProductsQuery

@QueryHandler
public List<Order> handle(FindAllOrderedProductsQuery query) {
    return new ArrayList<>(orders.values());
}

@QueryHandler注解的方法将处理FindAllOrderedProductsQuery,并返回List<Order>,与典型的"查找所有"查询一致。

10. 整合所有组件

我们已通过命令、事件和查询完善了核心API,并通过OrderAggregateOrder模型建立了命令和查询模型。

接下来整合基础设施。由于使用axon-spring-boot-starter,许多必需配置已自动完成。

首先,为聚合根启用事件溯源,需要EventStore第三步启动的Axon Server正好满足此需求。

其次,需要存储Order查询模型的机制。本例添加h2内存数据库和spring-boot-starter-data-jpa简化操作:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <scope>runtime</scope>
</dependency>

10.1. 设置REST接口

通过添加spring-boot-starter-web依赖暴露REST接口:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

通过REST接口分发命令和查询:

@RestController
public class OrderRestEndpoint {

    private final CommandGateway commandGateway;
    private final QueryGateway queryGateway;

    // 自动装配的构造方法和POST/GET接口
}

CommandGateway用于发送命令消息,QueryGateway用于发送查询消息。相比它们连接的CommandBusQueryBus,网关提供了更简洁的API。

OrderRestEndpoint应包含创建、确认和发货订单的POST接口:

@PostMapping("/ship-order")
public CompletableFuture<Void> shipOrder() {
    String orderId = UUID.randomUUID().toString();
    return commandGateway.send(new CreateOrderCommand(orderId, "Deluxe Chair"))
                         .thenCompose(result -> commandGateway.send(new ConfirmOrderCommand(orderId)))
                         .thenCompose(result -> commandGateway.send(new ShipOrderCommand(orderId)));
}

这完成了CQRS应用的命令端。注意网关返回CompletableFuture,支持异步操作。

最后是查询所有订单的GET接口:

@GetMapping("/all-orders")
public CompletableFuture<List<Order>> findAllOrders() {
    return queryGateway.query(new FindAllOrderedProductsQuery(), ResponseTypes.multipleInstancesOf(Order.class));
}

GET接口中,我们使用QueryGateway分发点对点查询。创建默认的FindAllOrderedProductsQuery时,需指定预期返回类型。

由于预期返回多个Order实例,使用静态方法ResponseTypes#multipleInstancesOf(Class)。至此,我们为订单服务的查询端提供了基础入口。

所有组件已就绪,启动OrderApplication后,可通过REST控制器发送命令和查询。

POST请求到/ship-order接口会实例化OrderAggregate并发布事件,进而保存/更新Order。GET请求到/all-orders接口会发布查询消息,由OrdersEventHandler处理并返回所有现有订单。

11. 结论

本文介绍了Axon框架作为构建CQRS和事件溯源应用的强大基础。

我们使用框架实现了一个简单的订单服务,展示了此类应用的实际结构。

最后,Axon Server作为事件存储和消息路由机制,极大简化了基础设施。

更多相关问题,请参考Discuss AxonIQ


原始标题:A Guide to the Axon Framework | Baeldung