转载

Axon框架指南 - Baeldung

在本文中,我们将介绍 Axon 以及它如何帮助我们实现具有 CQRS (Command Query Responsibility Segregation)和 Event Sourcing的 应用程序。

在本指南中,将使用Axon Framework和 Axon Server 。前者将包含我们的实现,后者将是我们专用的事件存储和消息路由解决方案。

我们将要构建的示例应用程序专注于Order域。为此,我们将利用Axon为我们提供的CQRS和Event Sourcing构建模块。

请注意,很多共享概念都来自 DDD ,这超出了本文的范围。

Maven依赖

我们将创建一个Axon / Spring Boot应用程序。因此,我们需要将最新的 axon-spring-boot-starter 依赖项添加到我们的pom.xml中,以及用于测试的 axon-test 依赖项:

<dependency>
    <groupId>org.axonframework</groupId>
    <artifactId>axon-spring-boot-starter</artifactId>
    <version>4.1.2</version>
</dependency>
 
<dependency>
    <groupId>org.axonframework</groupId>
    <artifactId>axon-test</artifactId>
    <version>4.1.2</version>
    <scope>test</scope>
</dependency>

 Axon Server

我们将使用 Axon Server 作为我们的 Event Store 和我们的专用命令,事件和查询路由解决方案。

作为事件存储,它为我们提供了存储事件时所需的理想特性。 该 文章提供了背景为什么这是可取的。

作为消息路由解决方案,它为我们提供了将多个实例连接在一起的选项,而无需专注于配置RabbitMQ或Kafka主题以共享和分发消息。

Axon Server可以在 这里 下载。由于它是一个简单的JAR文件,以下操作足以启动它:

java -jar axonserver.jar

这将启动一个可通过 localhost 访问的Axon Server实例 :8024 。端点提供已连接应用程序及其可以处理的消息的概述,以及Axon Server中包含的事件存储的查询机制。

Axon Server的默认配置与axon-spring-boot-starter依赖关系将确保我们的Order服务将自动连接到它。

订单服务API - 命令

我们将以CQRS为基础设置订单服务。因此,我们将强调流经我们应用程序的消息。

首先,我们将定义命令,即意图的表达。Order服务能够处理三种不同类型的操作:

  1. 下新订单
  2. 确认订单
  3. 发货订单

当然,我们的域可以处理三个命令消息 -  PlaceOrderCommand,ConfirmOrderCommand和ShipOrderCommand:

<b>public</b> <b>class</b> PlaceOrderCommand {
  
    @TargetAggregateIdentifier
    <b>private</b> <b>final</b> String orderId;
    <b>private</b> <b>final</b> String product;
  
    <font><i>// constructor, getters, equals/hashCode and toString </i></font><font>
}
<b>public</b> <b>class</b> ConfirmOrderCommand {
  
    @TargetAggregateIdentifier
    <b>private</b> <b>final</b> String orderId;
     
    </font><font><i>// constructor, getters, equals/hashCode and toString</i></font><font>
}
<b>public</b> <b>class</b> ShipOrderCommand {
  
    @TargetAggregateIdentifier
    <b>private</b> <b>final</b> String orderId;
  
    </font><font><i>// constructor, getters, equals/hashCode and toString</i></font><font>
}
</font>

该 TargetAggregateIdentifier 注解告诉轴突的注释字段是一个给定的聚合ID,以该命令应该有针对性。 我们将在本文后面简要介绍聚合。

另请注意,我们将命令中的字段标记为  final。 这是故意的,因为任何消息实现都是不可变的最佳实践。

订单服务API - 事件

我们的聚合将处理这些命令,因为它负责决定是否可以下达,确认或发送订单。

它将通过发布活动通知其决定的其余部分。我们将有三种类型的事件 -  OrderPlacedEvent,OrderConfirmedEvent和OrderShippedEvent:

<b>public</b> <b>class</b> OrderPlacedEvent {
  
    <b>private</b> <b>final</b> String orderId;
    <b>private</b> <b>final</b> String product;
  
    <font><i>// default constructor, getters, equals/hashCode and toString</i></font><font>
}
<b>public</b> <b>class</b> OrderConfirmedEvent {
  
    <b>private</b> <b>final</b> String orderId;
  
    </font><font><i>// default constructor, getters, equals/hashCode and toString</i></font><font>
}
<b>public</b> <b>class</b> OrderShippedEvent { 
 
    <b>private</b> <b>final</b> String orderId; 
 
    </font><font><i>// default constructor, getters, equals/hashCode and toString </i></font><font>
}
</font>

命令模型 - 订单聚合

现在我们已经根据命令和事件建模了我们的核心API,我们可以开始创建命令模型。

由于我们的领域专注于处理订单,  我们将创建一个OrderAggregate作为我们的命令模型的中心。

聚合类,创建我们的基本聚合类:

@Aggregate
<b>public</b> <b>class</b> OrderAggregate {
 
    @AggregateIdentifier
    <b>private</b> String orderId;
    <b>private</b> <b>boolean</b> orderConfirmed;
 
    @CommandHandler
    <b>public</b> OrderAggregate(PlaceOrderCommand command) {
        AggregateLifecycle.apply(<b>new</b> OrderPlacedEvent(command.getOrderId(), command.getProduct()));
    }
 
    @EventSourcingHandler
    <b>public</b> <b>void</b> on(OrderPlacedEvent event) {
        <b>this</b>.orderId = event.getOrderId();
        orderConfirmed = false;
    }
 
    <b>protected</b> OrderAggregate() { }
}

使用@Aggregate注释标记这个类作为一个聚合体。它将通知框架需要为此OrderAggregate实例化所需的CQRS和Event Sourcing特定构建块。

由于聚合将处理针对特定聚合实例的命令,因此我们需要使用 AggregateIdentifier 注释指定标识符。

在OrderAggregate '命令处理构造函数'中处理PlaceOrderCommand时,我们的聚合将开始其生命周期。为了告诉框架使用指定函数处理命令,我们将添加 CommandHandler 注释。

处理PlaceOrderCommand时,它将通过发布OrderPlacedEvent通知应用程序的其余部分已下达订单。要从聚合中发布事件,我们将使用 AggregateLifecycle #application(Object ...) 。

从这一点开始,我们实际上可以开始将Event Sourcing作为从事件流中重新创建聚合实例的驱动力。

我们从“聚合创建事件”开始,即OrderPlacedEvent,它在 EventSourcingHandler 注释函数中处理,以设置Order聚合的orderId和orderConfirmed状态。

另请注意,为了能够根据事件来源聚合,Axon需要一个默认构造函数。

聚合命令处理程序

现在我们有了基本聚合,我们可以开始实现剩余的命令处理程序:

@CommandHandler
<b>public</b> <b>void</b> handle(ConfirmOrderCommand command) { 
    apply(<b>new</b> OrderConfirmedEvent(orderId)); 
} 
 
@CommandHandler
<b>public</b> <b>void</b> handle(ShipOrderCommand command) { 
    <b>if</b> (!orderConfirmed) { 
        <b>throw</b> <b>new</b> UnconfirmedOrderException(); 
    } 
    apply(<b>new</b> OrderShippedEvent(orderId)); 
} 
 
@EventSourcingHandler
<b>public</b> <b>void</b> on(OrderConfirmedEvent event) { 
    orderConfirmed = <b>true</b>; 
}

我们已经定义订单只有在确认后才能发货。因此,如果不是这种情况,我们将抛出UnconfirmedOrderException。

这表明OrderConfirmedEvent采购处理程序需要将Order聚合的orderConfirmed状态更新为true。

测试命令模型

首先,我们需要创建一个为OrderAggregate测试的配置 FixtureConfiguration :

<b>private</b> FixtureConfiguration<OrderAggregate> fixture;
 
@Before
<b>public</b> <b>void</b> setUp() {
    fixture = <b>new</b> AggregateTestFixture<>(OrderAggregate.<b>class</b>);
}

第一个测试用例应该涵盖最简单的情况。当聚合处理  PlaceOrderCommand时,它应该生成一个  OrderPlacedEvent:

String orderId = UUID.randomUUID().toString();
String product = <font>"Deluxe Chair"</font><font>;
fixture.givenNoPriorActivity()
  .when(<b>new</b> PlaceOrderCommand(orderId, product))
  .expectEvents(<b>new</b> OrderPlacedEvent(orderId, product));
</font>

接下来,我们可以测试只有在确认后能够发送订单的决策逻辑。因此,我们有两个场景 - 一个是我们期望异常的场景,另一个是我们期望  OrderShippedEvent的场景。

让我们看看第一个场景,我们期待一个异常:

String orderId = UUID.randomUUID().toString();
String product = <font>"Deluxe Chair"</font><font>;
fixture.given(<b>new</b> OrderPlacedEvent(orderId, product))
  .when(<b>new</b> ShipOrderCommand(orderId))
  .expectException(IllegalStateException.<b>class</b>);
</font>

现在是第二种情况,我们期待OrderShippedEvent:

String orderId = UUID.randomUUID().toString();
String product = <font>"Deluxe Chair"</font><font>;
fixture.given(<b>new</b> OrderPlacedEvent(orderId, product), <b>new</b> OrderConfirmedEvent(orderId))
  .when(<b>new</b> ShipOrderCommand(orderId))
  .expectEvents(<b>new</b> OrderShippedEvent(orderId));
</font>

查询模型 - 事件处理程序

到目前为止,我们已经使用命令和事件建立了我们的核心API,并且我们拥有CQRS Order服务的Command模型,Order aggregate。

接下来,  我们可以开始考虑我们的应用程序应该服务的查询模型之一。

其中一个模型是OrderedProducts:

<b>public</b> <b>class</b> OrderedProduct {
 
    <b>private</b> <b>final</b> String orderId;
    <b>private</b> <b>final</b> String product;
    <b>private</b> OrderStatus orderStatus;
 
    <b>public</b> OrderedProduct(String orderId, String product) {
        <b>this</b>.orderId = orderId;
        <b>this</b>.product = product;
        orderStatus = OrderStatus.PLACED;
    }
 
    <b>public</b> <b>void</b> setOrderConfirmed() {
        <b>this</b>.orderStatus = OrderStatus.CONFIRMED;
    }
 
    <b>public</b> <b>void</b> setOrderShipped() {
        <b>this</b>.orderStatus = OrderStatus.SHIPPED;
    }
 
    <font><i>// getters, equals/hashCode and toString functions</i></font><font>
}
<b>public</b> enum OrderStatus {
    PLACED, CONFIRMED, SHIPPED
}
</font>

我们将根据通过系统传播的事件更新此模型。用于更新模型的Spring Service bean可以解决这个问题:

@Service
<b>public</b> <b>class</b> OrderedProductsEventHandler {
 
    <b>private</b> <b>final</b> Map<String, OrderedProduct> orderedProducts = <b>new</b> HashMap<>();
 
    @EventHandler
    <b>public</b> <b>void</b> on(OrderPlacedEvent event) {
        String orderId = event.getOrderId();
        orderedProducts.put(orderId, <b>new</b> OrderedProduct(orderId, event.getProduct()));
    }
 
    <font><i>// Event Handlers for OrderConfirmedEvent and OrderShippedEvent...</i></font><font>
}
</font>

由于我们已经使用axon-spring-boot-starter依赖来启动我们的Axon应用程序,因此框架将自动扫描所有bean以查找现有的消息处理函数。

由于  OrderedProductsEventHandler具有用于存储OrderedProduct并更新它的 EventHandler 注释函数,因此该bean将被框架注册为应该接收事件而不需要我们任何配置的类。

查询模型 - 查询处理程序

接下来,要查询此模型,例如,要检索所有已订购的产品,我们应首先向我们的核心API引入一条Query消息:

public class FindAllOrderedProductsQuery { }

其次,我们必须更新OrderedProductsEventHandler才能处理FindAllOrderedProductsQuery:

@QueryHandler
<b>public</b> List<OrderedProduct> handle(FindAllOrderedProductsQuery query) {
    <b>return</b> <b>new</b> ArrayList<>(orderedProducts.values());
}

该 QueryHandler 注释功能将处理FindAllOrderedProductsQuery并设置为返回一个List<OrderedProduct>,类似“find all”查询。

把所有东西放在一起

我们通过命令,事件和查询充实了我们的核心API,并通过OrderAggregate和OrderedProducts模型设置了我们的命令和查询模型。

接下来是绑定我们基础设施的松散端。当我们使用axon-spring-boot-starter时,它会自动设置许多所需的配置。

首先,由于我们想要为我们的聚合利用事件采购,我们需要一个 EventStore 。我们在第三步中启动的Axon Server将填补这个漏洞。

其次,我们需要一种机制来存储我们的OrderedProduct查询模型。对于此示例,我们可以添加 h2 作为内存数据库和 spring-boot-starter-data-jpa 以便于使用:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <scope>runtime</scope>
</dependency>

设置REST端点

接下来,我们需要能够访问我们的应用程序,我们将通过添加 spring-boot-starter-web 依赖关系来利用REST端点:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

从我们的REST端点,我们可以开始调度命令和查询:

@RestController
<b>public</b> <b>class</b> OrderRestEndpoint {
 
    <b>private</b> <b>final</b> CommandGateway commandGateway;
    <b>private</b> <b>final</b> QueryGateway queryGateway;
 
    <font><i>// Autowiring constructor and POST/GET endpoints</i></font><font>
}
</font>

该 CommandGateway 被用作机制发送我们的命令消息,以及 QueryGateway ,然后发送查询消息,

与 CommandBus 和 QueryBus 相比,该网关提供了更简单,更直接的API 。

从这里开始,我们的OrderRestEndpoint应该有一个POST端点来放置,确认和发送订单:

@PostMapping(<font>"/ship-order"</font><font>)
<b>public</b> <b>void</b> shipOrder() {
    String orderId = UUID.randomUUID().toString();
    commandGateway.send(<b>new</b> PlaceOrderCommand(orderId, </font><font>"Deluxe Chair"</font><font>));
    commandGateway.send(<b>new</b> ConfirmOrderCommand(orderId));
    commandGateway.send(<b>new</b> ShipOrderCommand(orderId));
}
</font>

这使我们的CQRS应用程序的命令端更加完整。

现在,剩下的就是一个GET端点来查询所有OrderedProducts:

@GetMapping(<font>"/all-orders"</font><font>)
<b>public</b> List<OrderedProduct> findAllOrderedProducts() {
    <b>return</b> queryGateway.query(<b>new</b> FindAllOrderedProductsQuery(), 
      ResponseTypes.multipleInstancesOf(OrderedProduct.<b>class</b>)).join();
}
</font>

在GET端点中,我们利用QueryGateway来分派点对点查询。于是,我们创建一个默认的  FindAllOrderedProductsQuery,但我们还需要指定预期的返回类型。

由于我们期望返回多个OrderedProduct实例,因此我们利用静态 ResponseTypes#multipleInstancesOf(Class) 函数。有了这个,我们为订单服务的查询方面提供了一个基本入口。

我们完成了设置,现在我们可以在启动OrderApplication后通过REST控制器发送一些命令和查询  。

POST到端点/发货订单将实例化一个OrderAggregate,它将发布事件,这反过来将保存/更新我们的OrderedProducts。来自/ all-orders  端点的GET 将发布一个查询消息,该消息将由OrderedProductsEventHandler处理,该消息将返回所有现有的OrderedProducts。

结论

在本文中,我们介绍了Axon Framework作为构建应用程序的强大基础,充分利用了CQRS和Event Sourcing的优势。

我们使用框架实现了一个简单的Order服务,以展示如何在实践中构建这样的应用程序。

最后,Axon Server构成了我们的事件存储和消息路由机制。

可以 在GitHub上 找到所有这些示例和代码片段的实现。

如果您有任何其他问题,请查看 Axon Framework用户组 。

原文  https://www.jdon.com/52836
正文到此结束
Loading...