在任何两个服务之间发送的命令或事件时,通过引入松耦合组件避免点对点直接RPC等同步访问由很多好处。在现代数据架构中,我们经常发现Apache Kafka是所有数据流核心的分布式流媒体平台。这意味着我们需要找到一种方法来更新数据存储,并另外将事件作为消息写入Kafka主题以供其他服务使用。
事实证明,从头开始以可靠和一致的方式实现这一点实际上并非易事:
以可靠和一致的方式实现这一点的某种“低估”方法是通过所谓的“发件箱模式”(也称为 事务发件箱 ),这是微服务架构环境中几种明确定义的模式之一。 Gunnar Morling 在 Debezium博客 上有 一篇 关于 outbox 模式的详细,内容丰富且写得非常好的 文章 。如果您想获得更多背景知识并对此主题进行更深入的调查,这是一个强烈推荐的阅读。此外,它们还使用Java EE技术堆栈提供交钥匙就绪参考实现。
基于略微简化但非常相似的示例,本博客文章讨论了使用不同技术堆栈进行演示微服务的POC实现:
基于事件的通信仍然建立在Apache Kafka与Kafka Connect和Debezium之上。
事件结构
需要写入“发件箱”的每个事件都必须具有某些属性。出于这个原因,有一个名为Outboxable的通用接口:
<b>public</b> <b>interface</b> Outboxable { <font><i>/** * DDD聚合Id,The id of the aggregate affected by a given event. This is also used to ensure strict * ordering of events belonging to one and the same aggregate. */</i></font><font> String getAggregateId(); </font><font><i>/** * The type of the aggregate affected by a given event. This needs to be the same type string for all * related parts of one an the same aggregate that might get changed. */</i></font><font> String getAggregateType(); </font><font><i>/** * The actual event payload as String e.g. JSON serialization. */</i></font><font> String getPayload(); </font><font><i>/** * The (sub)type of an actual event which causes any changes to a specific aggregate type. */</i></font><font> String getType(); </font><font><i>/** * The application timestamp at which the event has happened. */</i></font><font> Long getTimestamp(); } </font>
数据库的事件表结构
订单微服务的数据存储是MySQL。存储任何类型Outboxable事件的相应表结构看起来不足为奇, 按这里所示 。
发件箱事件
需要在数据库中持久保存的每个“outboxable”事件都将转换为@Entity OutboxEvent,它反映了上面显示的结构:
@Entity <b>public</b> <b>class</b> OutboxEvent { @Id @GeneratedValue @Type(type = <font>"uuid-char"</font><font>) <b>private</b> UUID id; @NotNull <b>private</b> String aggregateType; @NotNull <b>private</b> String aggregateId; @NotNull <b>private</b> String type; @NotNull <b>private</b> Long timestamp; @NotNull @Column(length = 1048576) </font><font><i>//e.g. 1 MB max</i></font><font> <b>private</b> String payload; <b>private</b> OutboxEvent() { } </font><font><i>//...</i></font><font> } </font>
发件箱监听器
有一个专门的Spring组件OutboxListener,它负责响应任何“outboxable”事件的调度。它调用OutboxEventRepository用于CRUD,以便预先确定实际的OutboxEvent实体:
@Component <b>public</b> <b>class</b> OutboxListener { <b>private</b> OutboxEventRepository repository; <b>public</b> OutboxListener(OutboxEventRepository repository) { <b>this</b>.repository = repository; } @EventListener <b>public</b> <b>void</b> onExportedEvent(Outboxable event) { OutboxEvent outboxEvent = OutboxEvent.from(event); <font><i>// The outbox event will be written to the "outbox" table</i></font><font> </font><font><i>// and immediately afterwards removed again. Thus the</i></font><font> </font><font><i>// "outbox" table is effectively empty all the time. From a</i></font><font> </font><font><i>// CDC perspective this will produce an INSERT operation</i></font><font> </font><font><i>// followed by a DELETE operation of the same record such that</i></font><font> </font><font><i>// both events are captured from the database log by Debezium.</i></font><font> repository.save(outboxEvent); repository.delete(outboxEvent); } } </font>
实现当然与“outboxable”事件的起源无关,因此,如果事件是通过Spring Data @DomainEvents机制发布还是通过ApplicationEventPublisher手动触发,则无关紧要。
发射Outboxable事件
由于Spring Boot示例使用Spring Data,因此我们可以为PurchaseOrder实体使用@DomainEvents机制。这样做,每次调用相应的PurchaseOrderRepository的save(...)方法时,Spring都会确保发布我们需要通知的有关插入或更新一个这样的实体的任何自定义事件。事实上,这正是我们希望在发件箱模式的上下文中发生的事情。它可以通过遵循下面的代码段中的简单约定轻松实现:
@Entity <b>public</b> <b>class</b> PurchaseOrder { <font><i>//...</i></font><font> @DomainEvents <b>private</b> Collection<Outboxable> triggerOutboxEvents() { <b>return</b> Arrays.asList(OrderUpsertedEvent.of(<b>this</b>)); } </font><font><i>//...</i></font><font> } </font>
通过使用@DomainEvents批注,Spring Data将调用此方法并发布其Collection <Outboxable>返回值中包含的所有事件。上面的代码只使用一个“outboxable” OrderUpsertedEvent来反映实体本身的当前状态:
<b>public</b> <b>class</b> OrderUpsertedEvent implements Outboxable { <b>private</b> <b>static</b> ObjectMapper MAPPER = <b>new</b> ObjectMapper(); <b>private</b> <b>final</b> Long id; <b>private</b> <b>final</b> JsonNode payload; <b>private</b> <b>final</b> Long timestamp; <b>static</b> { MAPPER.registerModule(<b>new</b> JavaTimeModule()); } <b>private</b> OrderUpsertedEvent(Long id, JsonNode payload) { <b>this</b>.id = id; <b>this</b>.payload = payload; <b>this</b>.timestamp = Instant.now().getEpochSecond(); } <b>public</b> <b>static</b> OrderUpsertedEvent of(PurchaseOrder order) { <b>return</b> <b>new</b> OrderUpsertedEvent(order.getId(), MAPPER.valueToTree(order)); } @Override <b>public</b> String getAggregateId() { <b>return</b> String.valueOf(id); } @Override <b>public</b> String getAggregateType() { <b>return</b> PurchaseOrder.<b>class</b>.getName(); } @Override <b>public</b> String getType() { <b>return</b> <b>this</b>.getClass().getName(); } @Override <b>public</b> Long getTimestamp() { <b>return</b> timestamp; } @Override <b>public</b> String getPayload() { <b>try</b> { <b>return</b> MAPPER.writeValueAsString(payload); } <b>catch</b> (JsonProcessingException e) { e.printStackTrace(); } <b>return</b> <b>null</b>; } }
这个演示应用程序使用Jackson并将事件有效负载结构序列化为JSON字符串,但通常任何字符串序列化都可以,例如,利用Base64来支持二进制数据的编码。此处使用名称OrderUpsertedEvent,因为此事件类型实际上将在以下两个条件下发布:a)每次将新的采购订单实体插入到底层的outbox_event表中时b)每次我们更新现有的采购订单实体时。在@Service OrderService的placeOrder(...)方法中,没有此事件的证据,因为它在后台由Spring Data隐式处理。
@Service <b>public</b> <b>class</b> OrderService { <font><i>//...</i></font><font> @Transactional <b>public</b> PurchaseOrder placeOrder(PurchaseOrder order) { repository.save(order); </font><font><i>//NOTE: OrderUpsertedEvent automatically published behind the scenes</i></font><font> <b>return</b> order; } </font><font><i>//...</i></font><font> } </font>
同样重要的是要强调所有与持久性相关的行为都发生在同一个事务性范围内。这保证了ACID属性,因此两个写入 - 完整聚合的插入/更新(订单元数据和订单行详细信息)以及相应的“可开箱的” OrderUpsertedEvent - 一致地应用于数据库或者在错误时一起回滚。
虽然Spring Data @DomainEvents是将这些事件的发布附加到聚合实体以用于通知目的的一种很好的方式,但它们不是特别灵活,也不是那么直接以更细粒度的方式应用,即当我们只想要考虑并通知汇总的某些部分已发生变化。
正是由于这个原因,该演示还采用了另一种方法,通过Spring的ApplicationEventPublisher显式地/手动地发布“outboxable”事件。
@Service <b>public</b> <b>class</b> OrderService { <font><i>//... </i></font><font> @Transactional <b>public</b> PurchaseOrder updateOrderLineStatus(<b>long</b> orderId, <b>long</b> orderLineId, OrderLineStatus newStatus) { PurchaseOrder po = repository.findById(orderId) .orElseThrow(() -> <b>new</b> EntityNotFoundException(</font><font>"order with id "</font><font> + orderId + </font><font>" doesn't exist!"</font><font>)); OrderLineStatus oldStatus = po.updateOrderLine(orderLineId, newStatus); eventBus.publishEvent(OrderLineUpdatedEvent.of(orderId, orderLineId, newStatus, oldStatus)); repository.save(po); <b>return</b> po; } </font><font><i>//...</i></font><font> } </font>
此示例显示如何在完整订单的任何单个订单行更改其状态时触发自定义事件有效内容。因此,在执行更新后,我们发布了一个“outboxable” OrderLineUpdatedEvent来通知订单行状态修改。接下来,通过使用完整聚合显式调用存储库的save(...)方法,@ DomainEvents机制再次隐式发布另一个“可开箱的” OrderUpsertedEvent。
这是一个可选步骤,只有在需要通过每次更改的附加发件箱事件来传达聚合的每个新的完整状态时才会执行。同样,通过使用@Transactional进行注释,我们确保以一致且可靠的方式应用所有更改。
接受和处理发件箱事件
在将Debezium Source Connector安装到Kafka Connect环境后,您可以针对Kafka Connect的REST API发布以下配置,以捕获针对微服务示例的MySQL数据库的“发件箱表”应用的更改:
{ “ name ”: “ mysql-outbox-src-connector-01 ”, “ config ”:{ “ connector.<b>class</b> ”: “ io.debezium.connector.mysql.MySqlConnector ”, “ tasks.max ”: “ 1 ”, “ database.hostname ”: “ localhost ”, “ database.port ”: “ 3306 ”, “ database.user ”: “ debezium ”, “ database.password ”: “ dbz ”, “ database.server.id ”: “ 12345 ”, “ database.server.name ”: “ dbserver1 ”, “ database.whitelist ”: “ outbox-demo ”, “ database.serverTimezone ”: “欧洲/维也纳”, “ table.whitelist ”: “ outbox-demo.outbox_event ”, “ database.history.kafka.bootstrap.servers ”: “ localhost:9092 ”, “ database.history.kafka.topic ”: “ schema-changes.outbox-demo ”, “ tombstones.on.delete ”: “ false ” } }
在Kafka主题中检查原始发件箱事件
当我们运行Spring Boot应用程序时,在启动期间会创建两个带有几个订单行的样本订单。在创建订单后,订单行的状态也会立即更改,以模拟与服务公开的API的交互。这导致将几个事件写入“发件箱表”,然后由Debezium MySQL源连接器捕获。
我们可以在命令行上通过运行以下命令轻松检查写入配置的Kafka主题dbserver1.outbox-demo.outbox_event的消息:
bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic dbserver1.outbox-demo.outbox_event --from-beginning | jq
下面是两个示例消息,一个反映第一个订单的插入,然后相应删除相同的订单,后者再次完成以保持原始“发件箱表”无限增长。
将原始发件箱事件传播到MongoDB
尝试将原始发件箱事件流式传输到运营数据存储时,存在两个主要挑战。首先,大多数接收器都不能正确处理CDC事件,例如Debezium发布的事件,因为它们缺乏对所述事件的必要语义感知。其次,即使他们可以处理它们,它通常也符合CDC识别接收器连接器的兴趣来处理所有不同类型的CDC事件,即INSERT,UPDATE和DELETE。但是,当处理从“发件箱表”派生的CDC事件时,需要进行特殊处理,以允许忽略某些CDC事件类型。具体而言,任何DELETE(如上段所示)都不得反映在接收器中,因为这将始终删除任何先前的插入。
请记住,这源于这样一个事实:原始的“发件箱表”也始终是空的,并且仅用于从数据存储的日志中执行事务感知捕获更改。这里有一个 预览功能 更新了 MongoDB [url=https://github.com/hpgrahsl/kafka-connect-mongodb?source=post_page---------------------------]社区接收[/url] 器连接器 ,以通过特定配置选项允许此类方案。
下面的代码段显示了一个示例配置,它能够处理源自Debezium MySQL源连接器的原始发件箱事件:
{ “ name ”: “ mdb-sink-outbox-raw ”, “ config ”:{ “ key.converter ”: “ io.confluent.connect.avro.AvroConverter ”, “ key.converter.schema.registry.url ”: “ http:<font><i>// localhost:8081 ”,</i></font><font> “ value.converter ”: “ io.confluent.connect.avro.AvroConverter ”, “ value.converter.schema.registry.url ”: “ http:</font><font><i>// localhost:8081 ”,</i></font><font> “ connector.<b>class</b> ”: “ at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector ”, “ topics ”: “ dbserver1.outbox-demo.outbox_event ”, “ mongodb.connection.uri ”: “ mongodb:</font><font><i>// localhost:27017 / outboxed ”,</i></font><font> “ mongodb.collections ”: “发件箱,原料”, “ mongodb.collection.dbserver1.outbox-demo.outbox_event ”: “ outbox -raw ”, “ mongodb.change.data.capture.handler.outbox-raw ”: “ at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.RdbmsHandler ”, “ mongodb.change.data.capture.handler.operations.outbox-raw ”: “ c ” } } </font>
最重要的部分是最后一个配置条目mongodb.change.data.capture.handler.operations.outbox-raw ,可以配置一系列CDC操作类型:“c,r,u,d”。在这种情况下,我们只对处理“c”类型的操作感兴趣,即INSERT并忽略其他任何“r,u,d”。
根据定义,发件箱表将永远不会经历“u”即UPDATE,但当然它会收到“d”即DELETE用于在编写后立即清理任何事件。通过仅处理INSERT,接收器连接器能够保留源数据存储的原始“发件箱表”中生成的所有原始发件箱事件。
运行此接收器连接器会导致outboxed.outbox-rawMongoDB 中的集合跟踪所有创建的原始发件箱事件。
可以在GitHub上找到所讨论的示例应用程序的 完整源代码 以及Kafka Connector配置。