在本系列的上一篇文章中,作者介绍了 Spring Cloud 提供的消息中间件的抽象 Spring Cloud Stream 的优化方法,本文将主要介绍如何实现可靠消息。
微服务盛行的时代下,消息成为了不可缺少的组件,首先我们看一个例子,contract 系统创建了一个合同,然后发送创建合同的消息。看似简单,实际上分析一下它的出错可能性,会有以下几种:
同时成功或者同时失败,这个情况是一致的,是正确的,我们需要关心的就是不一致的情况。那么最简单的办法,就是让创建合同和发送消息成为一个事务,要么一起成功,要么一起失败,但是这么做的话耦合性太强,本身合同创建成功了,却因为消息发送的失败强制回滚。这个时候,可能就想到了存储消息数据,将合同创建和消息数据的存储作为一个事务,消息发送成功之后再去删除消息数据,定期去扫描未发送的消息数据,来保证消息的发送。但是这么做还是有一定的代价的,需要实现消息的存储,消息存储和合同创建还是耦合在一起的,不过这样的模式到 Event Sourcing
下面那就比较理想了,因为本身消息数据和 event 是一样的,存储了 event 相当于完成了存储消息数据,只需要在 event 下做一个标记即可。
做完了上面这些,就能保证消息一定从 producer 到 broker 这一过程,当然要做到消息不丢,必然产生的结果就是消息可能会重复,情况就是 broker 收到了消息,但是没有通知到 producer,这种情况下 producer 是认为消息没有投递成功的,会出现重复投递的情况。保证了消息一定送达 broker 之后,就是 consumer 和 broker 的关系了,consumer 在消费之后需要告诉 broker 消费成功,否则 broker 需要一直保存这些消息。当然消费端可能需要做更多的事情,比如保证同一 aggregate 事件的顺序消费。下面文章会以在 Axon 框架上做一些拓展,以分别实现 consumer 和 producer。
上面也说到了,在 Event Sourcing 模式下,我们只需要给事件加上一个是否投递的标志,这里我们就看看如何实现(这里只针对 JPA 做了实现)。
DomainEventEntry
: 复制代码
@Entity(name ="DomainEventEntry") @Getter @Setter @Table(indexes =@Index(columnList ="aggregateIdentifier,sequenceNumber", unique =true)) publicclassCustomDomainEventEntryextendsAbstractSequencedDomainEventEntry<byte[]>{ @NotNull @Column(columnDefinition ="tinyint(1) default 0") privatebooleansent =false; publicCustomDomainEventEntry(DomainEventMessage<?> eventMessage, Serializer serializer){ super(eventMessage, serializer,byte[].class); this.setSent(false); } /** * Default constructor required by JPA */ protectedCustomDomainEventEntry(){ } }
复制代码
publicclassCustomJpaEventStorageEngineextendsJpaEventStorageEngine{ publicCustomJpaEventStorageEngine(Builder builder){ super(builder); } @Override protectedObjectcreateEventEntity(EventMessage<?> eventMessage, Serializer serializer){ returnnewCustomDomainEventEntry(asDomainEventMessage(eventMessage), serializer); } publicstaticCustomJpaEventStorageEngine.Builderbuilder(){ returnnewCustomJpaEventStorageEngine.Builder(); } // 此处略去了 builder 部分代码 publicstaticclassBuilderextendsJpaEventStorageEngine.Builder{ ... } }
复制代码
@Bean publicEventStorageEngineeventStorageEngine(Serializer defaultSerializer, PersistenceExceptionResolver persistenceExceptionResolver, @Qualifier("eventSerializer")Serializer eventSerializer, EntityManagerProvider entityManagerProvider, EventUpcaster contractUpCaster, TransactionManager transactionManager){ returnCustomJpaEventStorageEngine.builder() .snapshotSerializer(defaultSerializer) .upcasterChain(contractUpCaster) .persistenceExceptionResolver(persistenceExceptionResolver) .eventSerializer(eventSerializer) .entityManagerProvider(entityManagerProvider) .transactionManager(transactionManager) .build(); }
做好了准备工作再发送消息就比较清晰了,我们需要做的就是在事件存储后去尝试发送消息,然后标记为已发送即可,在之前的 实现 CQRS
中我们留了一个坑,就是 view 端的更新不是在事件存储之后,这里我们就去实现发消息在事件存储之后,然后 view 层去监听消息更新。具体的实现就是利用 entity postPersist
去监听存储,在 transaction
成功后去尝试发送消息,代码如下:
复制代码
@EntityListeners(CustomDomainEventEntryListener.class) publicclassCustomDomainEventEntryextendsAbstractSequencedDomainEventEntry<byte[]>{ ... } @Component @Slf4j publicclassCustomDomainEventEntryListener{ privatestaticCustomDomainEventEntryRepository customDomainEventEntryRepository; privatestaticContractPublisher contractPublisher; @Autowired publicvoidinit(CustomDomainEventEntryRepository customDomainEventEntryRepository, ContractPublisher contractPublisher){ this.customDomainEventEntryRepository = customDomainEventEntryRepository; this.contractPublisher = contractPublisher; } @PostPersist voidonPersist(CustomDomainEventEntry entry){ TransactionSynchronizationManager.registerSynchronization(newTransactionSynchronizationAdapter() { @Override publicvoidafterCompletion(intstatus){ if(status == TransactionSynchronization.STATUS_COMMITTED) { CompletableFuture.runAsync(() -> sendEvent(entry.getEventIdentifier())); } } }); } @Transactional publicvoidsendEvent(String identifier){ CustomDomainEventEntry eventEntry = customDomainEventEntryRepository.findByEventIdentifier(identifier); if(!eventEntry.isSent()) { contractPublisher.sendEvent(eventEntry); eventEntry.setSent(true); customDomainEventEntryRepository.save(eventEntry); } } } @Repository publicinterfaceCustomDomainEventEntryRepositoryextendsJpaRepository<CustomDomainEventEntry,String>{ /** * 查找事件 * *@paramidentifier * *@return */ CustomDomainEventEntryfindByEventIdentifier(String identifier); } @Component @AllArgsConstructor @Slf4j publicclassContractEventPublisher{ publicvoidsendEvent(DomainEvent event){ // use stream to send message here log.info(MessageFormat.format("prepare to sending message : {0}]",newGson().toJson(event))); } publicvoidsendEvent(CustomDomainEventEntry event){ // use com.craftsman.eventsourcing.stream to send message here ObjectMapper mapper =newObjectMapper(); HashMap payload =null; HashMap metaData =null; try{ payload = mapper.readValue(event.getPayload().getData(), HashMap.class); metaData = mapper.readValue(event.getMetaData().getData(), HashMap.class); }catch(Exception exception) { log.error(MessageFormat.format("byte[] to string failed; exception: {0}", exception)); } if(payload ==null|| metaData ==null) { log.warn(MessageFormat.format("nothing to send; exception: {0}", event.getEventIdentifier())); return; } DomainEvent domainEvent =newDomainEvent( event.getType(), event.getAggregateIdentifier(), event.getPayload().getType().getName(), event.getPayload().getType().getRevision(), event.getSequenceNumber(), event.getEventIdentifier(), event.getTimestamp(), payload, metaData); this.sendEvent(domainEvent); } }
DomainEvent
是为了统一消息的格式包装的类,具体可以看代码这里就不贴了 ContractEventPublisher
作为消息统一发送出口,为了不涉及 rabbitmq
暂时以 log 的形式代替消息发送,后续在 Spring Cloud Stream
优化中实现完整的流程
从 DomainEvent
的属性中,我们可以看到有一个 sequenceNumber
字段,这个字段可以用来保证同一 aggregate 的事件顺序,那么在消费端可以以 type aggregate sequenceNumber 形成一张表,用来记录每个 aggregate 的最新状态,如果 aggregate 数据量比较大,也可以分表存储,一般 aggregate_id
索引之后,单表性能在百万级别,应该都没什么问题。这样在消费的时候先比较 sequenceNumber
差异,只消费差异值为 1 的事件,就可以保证同一 aggregate 的事件被顺序消费。之后会写篇关于 Spring Cloud Stream
的文章,用来作为服务之间的桥梁,并解决框架用 header 作为路由之后引起的问题,这里暂时不做深入。 完整的例子 - branch session7
周国勇,目前就职于杭州匠人网络创业,致力于楼宇资产管理的 SaaS 化,负责后端业务架构设计、项目管理,喜欢对业务模型的分析,热衷新技术的探索和实践,经常在踩坑的路上越走越远。