本系列的上一篇文章重点介绍了 Axon 实现,本文将主要介绍 Spring Cloud 提供的消息中间件的抽象 Spring Cloud Stream 的优化方法。
Spring Cloud Stream
(以下简称 SCS )是 Spring Cloud 提供的消息中间件的抽象,但是目前也就支持 kafka 和 rabbitmq,这篇文章主要会讨论一下如何让 SCS 更好的服务我们之前搭建的 Event Sourcing、CQRS 模型。以下是我在使用 SCS 的过程中存在的一些问题:
StreamListener
为了解决上面的问题,我们可以这么处理,先统一一个入口将 SCS 的消息接收,然后我们自己构建一个路由系统,将请求分发到我们自己定义的注解方法上,并且在这个过程中将 seq 的检查也给做了,大体的流程是这个样子的:
这样以上几点问题都会得到解决,下面我们来看看具体如何实现:
复制代码
@Target( {ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public@interfaceStreamEventHandler { String[] payloadTypes()default{""}; String[] types(); }
types 对应 Stream 本身 Inuput 的类型, payloadTypes 对应事件类型,比如 ContractCreated
,我们要做的效果是这个 payloadTypes 可以不写,直接从方法的第一个参数读取 class 的 simapleName。
复制代码
@Entity @Table(indexes =@Index(columnList ="aggregateIdentifier,type", unique =true)) @Getter @Setter @NoArgsConstructor publicclassDomainAggregateSequence{ @Id @GeneratedValue privateLong id; privateLong sequenceNumber; privateLong aggregateIdentifier; privateString type; } @Repository publicinterfaceDomainAggregateSequenceRepositoryextendsJpaRepository<DomainAggregateSequence,Long>{ /** * 根据 aggregate id 和 type 找到对应的记录 * *@paramidentifier *@paramtype * *@return */ DomainAggregateSequencefindByAggregateIdentifierAndType(Long identifier, String type); }
复制代码
@Slf4j @Component @AllArgsConstructor publicclassStreamDomainEventDispatcherimplementsBeanPostProcessor{ privatefinalObjectMapper mapper; privatefinalDomainAggregateSequenceRepository domainAggregateSequenceRepository; privateHashMap<Object, List<Method>> beanHandlerMap =newHashMap<>(); @Autowired publicStreamDomainEventDispatcher(ObjectMapper mapper, DomainAggregateSequenceRepository domainAggregateSequenceRepository){ this.mapper = mapper; this.domainAggregateSequenceRepository = domainAggregateSequenceRepository; } @Transactional publicvoiddispatchEvent(DomainEvent event, String type){ log.info(MessageFormat.format("message [{0}] received", event.getEventIdentifier())); // 1. 检查是否是乱序事件或者重复事件 Long aggregateIdentifier = Long.parseLong(event.getAggregateIdentifier()); String eventType = event.getType(); Long eventSequence = event.getSequenceNumber(); DomainAggregateSequence sequenceObject = domainAggregateSequenceRepository.findByAggregateIdentifierAndType(aggregateIdentifier, eventType); if(sequenceObject ==null) { sequenceObject =newDomainAggregateSequence(); sequenceObject.setSequenceNumber(eventSequence); sequenceObject.setAggregateIdentifier(aggregateIdentifier); sequenceObject.setType(eventType); }elseif(sequenceObject.getSequenceNumber() +1!= eventSequence) { // 重复事件,直接忽略 if(sequenceObject.getSequenceNumber().equals(eventSequence)) { log.warn(MessageFormat.format("repeat event ignored, type[{0}] aggregate[{1}] seq[{2}] , ignored", event.getType(), event.getAggregateIdentifier(), event.getSequenceNumber())); return; } thrownewStreamEventSequenceException(MessageFormat.format("sequence error, db [{0}], current [{1}]", sequenceObject.getSequenceNumber(), eventSequence)); }else{ sequenceObject.setSequenceNumber(eventSequence); } domainAggregateSequenceRepository.save(sequenceObject); // 2. 分发事件到各个 handler beanHandlerMap.forEach((key, value) -> { Optional<Method> matchedMethod = getMatchedMethods(value, type, event.getPayloadType()); matchedMethod.ifPresent(method -> { try{ invoke(key, method, event); }catch(IllegalAccessException | InvocationTargetException e) { thrownewStreamHandlerException(MessageFormat.format("[{0}] invoke error", method.getName()), e); } }); if(!matchedMethod.isPresent()) { log.info(MessageFormat.format("message [{0}] has no listener", event.getEventIdentifier())); } }); log.info(MessageFormat.format("message [{0}] handled", event.getEventIdentifier())); } @Transactional publicOptional<Method>getMatchedMethods(List<Method> methods, String type, String payloadType){ // 这里应该只有一个方法,因为将 stream 的单个事件分成多个之后,无法保证一致性 List<Method> results = methods.stream().filter(m -> { StreamEventHandler handler = m.getAnnotation(StreamEventHandler.class); List<String> types =newArrayList<>(Arrays.asList(handler.types())); List<String> payloadTypes =newArrayList<>(Arrays.asList(handler.payloadTypes())); types.removeIf(StringUtils::isBlank); payloadTypes.removeIf(StringUtils::isBlank); if(CollectionUtils.isEmpty(payloadTypes) && m.getParameterTypes().length !=0) { payloadTypes = Collections.singletonList(m.getParameterTypes()[0].getSimpleName()); } booleanisTypeMatch = types.contains(type); String checkedPayloadType = payloadType; if(StringUtils.contains(checkedPayloadType,".")) { checkedPayloadType = StringUtils.substringAfterLast(checkedPayloadType,"."); } booleanisPayloadTypeMatch = payloadTypes.contains(checkedPayloadType); returnisTypeMatch && isPayloadTypeMatch; }).collect(Collectors.toList()); if(results.size() >1) { thrownewStreamHandlerException(MessageFormat.format("type[{0}] event[{1}] has more than one handler", type, payloadType)); } returnresults.size() ==1? Optional.of(results.get(0)) : Optional.empty(); } @Transactional publicvoidinvoke(Object bean, Method method, DomainEvent event)throwsIllegalAccessException, InvocationTargetException{ intcount = method.getParameterCount(); if(count ==0) { method.invoke(bean); }elseif(count ==1) { Class<?> payloadType = method.getParameterTypes()[0]; if(payloadType.equals(DomainEvent.class)) { method.invoke(bean, mapper.convertValue(event.getPayload(), DomainEvent.class)); }else{ method.invoke(bean, mapper.convertValue(event.getPayload(), payloadType)); } }elseif(count ==2) { Class<?> payloadType0 = method.getParameterTypes()[0]; Class<?> payloadType1 = method.getParameterTypes()[1]; Object firstParameterValue = mapper.convertValue(event.getPayload(), payloadType0); Object secondParameterValue = event.getMetaData(); // 如果是 DomainEvent 类型则优先传递该类型,另外一个参数按照 payloadType > metaData 优先级传入 if(payloadType0.equals(DomainEvent.class)) { firstParameterValue = mapper.convertValue(event, payloadType0); secondParameterValue = mapper.convertValue(event.getPayload(), payloadType1); } if(payloadType1.equals(DomainEvent.class)) { secondParameterValue = mapper.convertValue(event, payloadType1); } method.invoke(bean, firstParameterValue, secondParameterValue); } } @Override publicObjectpostProcessBeforeInitialization(Object bean, String beanName)throwsBeansException{ returnbean; } @Override publicObjectpostProcessAfterInitialization(Object bean, String beanName)throwsBeansException{ Class<?> targetClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass(); Method[] uniqueDeclaredMethods = ReflectionUtils.getUniqueDeclaredMethods(targetClass); List<Method> methods =newArrayList<>(); for(Method method : uniqueDeclaredMethods) { StreamEventHandler streamListener = AnnotatedElementUtils.findMergedAnnotation(method, StreamEventHandler.class); if(streamListener !=null) { methods.add(method); } } if(!CollectionUtils.isEmpty(methods)) { beanHandlerMap.put(bean, methods); } returnbean; } }
这里参照了 SCS 本身手机 handler 的方式,会将有 @StreamEventHandler
注解的方法都找出来做一个记录。在 dispatchEvent 的时候会更新事件的 seq 并且按照 type 去调用各个标有注解的方法。
复制代码
@Slf4j @Component @Transactional @AllArgsConstructor publicclassDomainEventDispatcher{ privatefinalStreamDomainEventDispatcher streamDomainEventDispatcher; @StreamListener(target = ChannelDefinition.CONTRACTS_INPUT, condition ="headers['messageType']=='eventSourcing'") publicvoidhandleBuilding(@Payload DomainEvent event){ streamDomainEventDispatcher.dispatchEvent(event, ChannelDefinition.CONTRACTS_INPUT); } } @Component @AllArgsConstructor @Transactional publicclassContractEventHandler{ @StreamEventHandler(types = ChannelDefinition.CONTRACTS_INPUT) publicvoidhandle(ContractCreatedEvent event){ // 实现你的 view 层更新业务 } }
注意:
AbstractDomainEventDispatcher
中监听所有 bean 加载完成不能用 InitializingBean 接口,否则 @Transactional
会失效,这个有兴趣的同学可以研究一下 @Transactional
的机制。 至此以上几点就优化完了。
基于 SCS 的默认配置,存在一个致命的问题,那就是当消息处理失败(重试三次)之后,消息直接没了,这个相当于就是消息丢失了。那么解决方案其实也是比较简单的,一般有两种解决方案:
方案 1 这么做可能会出的问题就是,这个消息反复消费,反复失败,引起循环问题从而导致服务出现问题,这个就需要在 broker 做一些策略配置了,为了让 broker 尽可能的简单,我们这里采用方案 2,要实现的流程是这样的:
复制代码
spring: application: name:event-sourcing-service datasource: url:jdbc:mysql://localhost:3306/event?useUnicode=true&autoReconnect=true&rewriteBatchedStatements=TRUE username:root password:root jpa: hibernate: ddl-auto:update use-new-id-generator-mappings:false show-sql:false properties: hibernate.dialect:org.hibernate.dialect.MySQL55Dialect rabbitmq: host:localhost port:5672 username:creams_user password:Souban701 cloud: stream.bindings: contract-events:# 这个名字对应代码中 @input("value") 的 value destination:contract-events# 这个对应 rabbit 中的 channel contentType:application/json# 这个指定传输类型,其实可以默认指定,但是目前每个地方都写了,所以统一下 contract-events-input: destination:contract-events contentType:application/json group:event-sourcing-service durableSubscription:true stream.rabbit.bindings.contract-events-input.consumer: autoBindDlq:true republishToDlq:true deadLetterQueueName:contract-error.dlq logging: level.org: springframework: web:INFO cloud.sleuth:INFO apache.ibatis:DEBUG java.sql:DEBUG hibernate: SQL:DEBUG type.descriptor.sql:TRACE axon: serializer: general:jackson
加上这个配置之后,rabbit 会给这个队列创建一个 .dlq 后缀的队列,异常消息都会被塞到这个队列里面(消息中包含了异常信息以及来源),等待我们处理, deadLetterQueueName
指定了 DLQ 的名称,这样所有的失败消息都会存放到同一个 queue 中。大部分的情况下,消息的异常都是由于 consumer 逻辑错误引起的,所以我们需要一个处理这些失败的消息的地方,比如在启动的时候自动拉取 DLQ 中的消息然后转发到原来的 queue 中去远程原有的业务逻辑,如果处理不了那么还是会继续进入到 DLQ 中。
复制代码
@Component publicclassDLXHandlerimplementsApplicationListener<ContextRefreshedEvent>,ApplicationContextAware{ privatefinalRabbitTemplate rabbitTemplate; privateApplicationContext applicationContext; privatestaticfinalString DLQ ="contract-error.dlq"; @Autowired publicDLXHandler(RabbitTemplate rabbitTemplate){ this.rabbitTemplate = rabbitTemplate; } @Override publicvoidsetApplicationContext(ApplicationContext applicationContext){ this.applicationContext = applicationContext; } @Override publicvoidonApplicationEvent(ContextRefreshedEvent event){ // SCS 会创建一个 child context ,这里需要判断下正确的 context 初始化完成 if(event.getApplicationContext().equals(this.applicationContext)) { // 启动后获取 dlq 中所有的消息,进行消费 Message message = rabbitTemplate.receive(DLQ); while(message !=null) { rabbitTemplate.send(message.getMessageProperties().getReceivedRoutingKey(), message); message = rabbitTemplate.receive(DLQ); } } } }
由于 SCS 没有提供给我们类似的接口,这里使用了 rabbitmq 的接口来获取消息。
经常上述这些基础操作之后,汇过来实现 CQRS 就比较清晰了,只需要监听相关的事件,然后更新视图层即可。
复制代码
@StreamEventHandler(types = ChannelDefinition.CONTRACTS_INPUT) publicvoidhandle(ContractCreatedEvent event, DomainEvent<ContractCreatedEvent, HashMap> domainEvent){ QueryContractCommand command =newQueryContractCommand(event.getIdentifier(), domainEvent.getTimestamp()); ContractAggregate aggregate = queryGateway.query(command, ContractAggregate.class).join(); ContractView view =newContractView(); view.setIndustryName(aggregate.getIndustryName()); view.setId(aggregate.getIdentifier()); view.setPartyB(aggregate.getPartyB()); view.setPartyA(aggregate.getPartyA()); view.setName(aggregate.getName()); view.setDeleted(aggregate.isDeleted()); contractViewRepository.save(view); }
StreamDomainEventDispatcher
对传参做了一些处理,当有两个参数的时候会将 DomainEvent
传递,因为有些时候可能会用到一些字段,比如时间、附加信息等等。这里在消费事件的时候,可以根据时间去查询 aggregate 的状态,然后直接做一个映射,也可以根据事件直接对 view 层做 CUD ,个人觉得在性能和速度不存在大问题的时候直接去查询一下 aggregate 当时的状态做一个映射即可,毕竟比较简单。
ContractViewHandler
即可。 完整的例子 - branch session6 周国勇,目前就职于杭州匠人网络创业,致力于楼宇资产管理的 SaaS 化,负责后端业务架构设计、项目管理,喜欢对业务模型的分析,热衷新技术的探索和实践,经常在踩坑的路上越走越远。