最近在用ENode开发conference案例时,遇到一个问题,写篇文章分享一下。
Conference案例,是一个关于在线创建会议(类似QCon这种全球开发者大会)、在线管理会议位置信息、在线预订某个会议的位置的,这样一个系统。具体可以看微软的这个项目的主页:http://cqrsjourney.github.io。
然后我们设计了一个Conference聚合根,对应领域中的会议这个领域概念。Conference聚合根下面,有一些位置信息SeatType。一个会议聚合根下面可以添加不同类型的位置,每种类型的位置可以指定可用数量以及价格。所以,Conference是聚合根,conference本身有一些我们所关心的基本属性,同时它内部聚合了一些SeatType子实体。每个SeatType包含了位置的价格、数量这两个信息。
然后,在UI层面,我们会有如下界面边界管理一个会议的所有位置信息。
上图列出了某个会议的两类位置,Quota表示位置的配合数量;当我们要修改某个位置时,可以点击链接,然后出现如下图所示:
出现四个编辑框,我们可以修改任何一个框。修改完后点击保存,我们就能更新某个类型的位置信息了。然后,我们在domain里,设计了两个domain event;分别表示位置基本信息改变和位置配额数量的改变。
为什么要独立出数量改变的domain event呢?因为当用户在前台下单订购位置时,这个数量也会变化。也就是位置数量可能会单独变化。所以,我们考虑单独为位置数量的变化定义一个domain event。
然后,我们目前的代码是,当点击保存时,首先更新会更新位置的基本信息,然后判断数量是否有变化,如果没变化,则只产生位置基本信息变化的domain event;如果有变化,则同时产生位置数量改变的domain event。Conference聚合根相关方法的具体实现如下:
上面的代码的大致意思是,先从聚合内找出需要修改的位置类型,如果不存在就抛异常;如果存在,则先产生位置基本信息改变事件;然后判断数量是否有变化,如果有变化,则继续判断当前输入的数量是否太小。因为太小也是不允许的。
比如,假如用户录入的数量是10,但是当前这种类型的位置已经有11个被预定了,那就不能改为10,而是必须至少为11。最后,如果一切都合法,就产生一个SeatTypeQuantityChanged的事件,表示某个类型的位置的数量发生了变化,同时在事件中带上可预定的剩余位置的数量。
然后读库我们就根据上面这两个事件来更新。
现在的问题是,我们假如两个事件都发生了,那读库要怎么原子更新(在一个事务里更新)?我们的一个event handler只能处理一个event;也就是说,我们会有两个event handler,分别处理对应的事件。由于domain aggregate是一次性原子的方式同时产生两个domain event。所以,我们要确保两个event handler要么都更新成功,要么都不更新成功,这个问题之前没考虑到过。下面我们来想想办法。
想办法把这两个event handler包装在一个事务里,但这要求框架支持这样的事务机制;对框架要求的的改造有点大,不太可行。因为框架要考虑的问题是要更通用的,比如,一旦引入事务,也许还会引入分布式事务等问题。而且这种做法,性能也不高,违反ENode一开始就是为高并发设计的初衷。
要求领域里不要设计两个domain event了,就用一个domain event解决;这个event包含所有信息的修改。这个办法可行,但要求模型做出妥协和让步了。假如有一天我们遇到模型必须要产生多个事件的情况,那怎么办呢?所以,这个思路还是在逃避问题。
不采用事务,而是采用乐观锁的方式解决问题。思路是,框架按照顺序调用这两个event handler,调用的顺序和这两个事件的顺序一致;两个event handler允许不在一个事务里。
这样的问题是,假如第一个事件处理成功了,然后此时机器断电了,第二个事件没被处理,怎么办?那就是要做到,当下一次机器重启后,第二个事件能被处理。然后,因为整个架构是分布式的,所以第一个事件也是有可能被重复处理的,框架在调用event handler时,为了性能方面的考虑,只能尽量保证同一个event不会被同一个event handler重复处理;但是框架有提供机制,让开发人员在event handler内部通过依赖版本号的方式来解决重复处理的问题。所以,总结一下,我们需要处理的问题有以下3个:
为了做到上面这3点,我对ENode做了一个改善,就是为事件引入了一个子版本号的概念。
就是当聚合根每次做出修改后,不管产生多少个domain event,这些domain event都是在一个event stream里;每个event stream都有一个版本号,然后每个domain event的主版本号就是其所在的event stream的版本号。比如某个聚合根某次变化产生了2个domain event,然后这个event stream的版本号为10,那每个domain event的主版本号也是10;这点ENode框架可以做保证。那event stream的版本号哪里来的?就是从聚合根上得来,因为每个聚合根都维护了当前自己的版本号是什么,那它下一次产生新的版本号就是+1即可。
上面解释了什么是事件的主版本号。下面我们在说一下什么是子版本号,子版本号比较简单,就是假如一个event stream里包含2个事件,那第一个事件的子版本号是1,第二个则是2;
然后,有了事件的主版本号和子版本号的概念。我们就可以做到上面的3点要求了。其中的第2点,EQueue会做到确保任何一个消息至少被处理一次,这里不做展开了。第1、3点,我们通过下面的代码结合分析讨论。
为了代码效果好一点,我直接通过截图的方式了,博客园以后官方提供一套这样的代码模板吧,呵呵。@蟋蟀,上次你跟我说的那个模板,我后来忘记使用了:)
上面的代码中,每个event handler内部有一个事务,为什么还需要事务?因为我们现在更新的是聚合根,子实体是聚合根的一部分;所以读库更新时,自然也要更新聚合根本身的。只不过这里只需要更新聚合根的版本号即可。
第一个event handler,我们先启动一个事务,然后先更新聚合根的主版本号,以及次版本号;假如数据库里当前的主版本号是10,次版本号是1,那这个evnt.Version就是11,evnt.Sequence是1,Sequence就是次版本号。然后通过第一条Update SQL我们就能更新聚合根的主版本号以及次版本号了。由于单条update sql是原子事务(无并发问题)的,所以我们只要判断更新的影响行数是否为1。如果是1,则说明更新成功,那就可以更新位置那条记录了。然后,由于这两条更新语句在一个事务里,所以要么全部完成,要么什么都不做。
第二个event handler,同样,我们也是先启动一个事务,然后区别是,因为我们知道SeatTypeQuantityChanged事件和SeatTypeUpdate事件总是在一个事件流里发生的,且它总是位于第二个顺序。所以,当这个event handler被执行时,聚合根的主版本号一定已经是11了,且子版本号是1。那么,我们在第二个event handler中,对聚合根,只需要更新子版本号为2即可。就是第一个Update语句。然后同样判断影响行数是否为1。如果是,则更新位置的数量以及可用数量;如果不是1,则什么都不做。
有一个问题,什么时候会出现不是1呢?就是在这个event handler被重复执行的时候。这种情况,我们忽略即可。因为我们就是为了要做到update的幂等处理。
到这里基本差不多了。但是还需要说明一个大前提。就是上面这个大家可以看到,第一个event handler里,更新聚合根的主版本号时,where条件里会判断聚合根记录的当前版本号时evnt.version - 1;这个就是为了保证,读库更新时,总是按照domain event的发生顺序依次更新的,不能跳过更新,也不能乱序。否则读库的最终数据就不一致了。所以,event handler内部要做这样的判断,确保绝对不会发生这样的事情。但光event handler内部判断还不够。ENode框架也要保证事件消息的处理顺序也是这样依次按照顺序的,否则event handler里聚合根更新的影响行数也许永远都不能为1了。ENode已经做了这样的保证!
上面的最后一个方案,我觉得是比较通用的解决方案。框架不需要做支持跨event handler的事务,改动比较小。同时还能保证读库更新的性能,另外,在断电的时候,也能保证事件被处理。一切的一切都是为了高性能、为了保证最终一致性。
又花了一篇文章分享了一点小小的设计,呵呵。