前一篇文章介绍了Conference案例的架构设计,本篇文章开始介绍Conference案例的代码实现。由于代码比较多,一开始就全部介绍所有细节,估计很多人接受不了,也理解不了。所以,我先进行一次QuickStart的介绍,即选取某个简单典型的场景从前到后过一下每个环节。这样大家就能够快速对代码的重要关键环节有大概的理解。另外,我现在正在做ENode的官网,到时会像axon framework一样,介绍ENode框架本身、使用场景、性能数据、案例,以及论坛社区等功能;
本文打算选择Conference案例中一个不太复杂的场景(发布会议),来快速过一下需要开发者实现的代码环节。
首先,我们来看一下发布一个会议的UI入口,前面的文章介绍过,当客户创建好一个会议后,他可以先编辑会议的所有座位类型,然后如果允许预订者预定了,那他就需要先发布一下这个会议。就像淘宝的商家要上架商品后商品才会对买家可见一样。本质上,发布一个会议,其实就是将会议聚合根的isPublished修改为true。如下图所示:
当客户点击Publish按钮后,前台提交HttpPost请求到服务端,然后请求就会被ASP.NET MVC的Controller处理,Controller的Action的逻辑如下:
上面的代码比较清晰,我们先判断当前的_conference实例是否为空,如果为空,则直接返回HttpNotFound结果。那_conference实例哪里来的?这里考虑到ConferenceController中的大部分Action都要使用当前的_conference实例,所以我们为了代码的复用,在Controller的OnActionExecuting方法里,提前获取了当前的_conference实例,代码我稍后再贴。_conference实例有了之后我们就可以构建一个PublishConference的命令。该命令只需要一个当前要发布的Conference的ID即可。然后,我们调用ExecuteCommandAsync方法,去异步执行一个命令。然后,我们使用await关键字异步等待命令的处理结果。最后判断结果是否成功,做相应的处理。
ExecuteCommandAsync方法:
该方法内部使用_commandService的ExecuteAsync方法来异步执行一个命令。_commandService是ENode框架提供的,用户异步执行一个命令。_commandService则是通过ConferenceController的构造函数注入的,代码如下:
使用ENode框架开发的Controller,一般是需要两个服务,一个是commanService,用于发送命令;另一个是某个QueryService,用语查询数据;这样的设计充分体现了CQRS架构的特点;当然,有时查询服务可能不止一个,那就可以注入多个,看我们自己需要即可。ENode使用的依赖注入框架是Autofac,后面会看到初始化过程。大家可能在想,为何要弄一个ExecuteCommandAsync方法出来呢?因为要处理超时的情况,假如一个命令处理超时了(比如5s),那Controller的Action也需要立即返回了。TimeoutAfter的代码如下:
大家可以看到TimeoutAfter方法内部,为了实现当超过指定时间后要求的Task还未处理完的情况,我们创建了一个延后指定时间执行的Task,最后判断完成的Task是哪个,从而实现超时的处理。这个做法是我在网上找到的,觉得还不错,这个做法可以让我们在实现完全异步的同时还能实现超时处理。最后,我们看一下OnActionExecuting方法:
OnActionExecuting
这个方法的代码的逻辑也比较简单,就是根据HttpRequest中包含的slug参数先获取一个Conference聚合根;如果存在,则进一步根据accessCode参数检查accessCode参数是否合法。通过合法,则认为提供的slug和accessCode有效。大家可以把slug理解为唯一定位一个Conference的,而accessCode是使用该Conference的密码。由于这个只是一个案例,所以我们通过这种简单有效的方法来为用户授权。
了解了Controller的实现,我们接下来看看Command的定义,Command是一个DTO对象。代码如下:
非常简单,由于ENode框架基类提供的Command类已经提供了一个AggregateRootId的属性,所以我们的PublishConference命令无需再定义其他额外的属性了。需要提一下的是,ENode框架要求,所有Command要创建或修改的聚合根的ID必须在Command发送之前赋值,这个是框架的一个约束,我认为这个通常不是问题。如果你希望聚合根的ID是一个long,那也许你需要自己部署一个全局long生成服务了,有兴趣的朋友可以和我交流实现方案。如果你的ID是一个字符串,那用ENode框架提供的ObjectId类即可,它可以帮你自动生成一个24位长度的全局唯一顺序字符串。接下来我们看看Command Handler的实现。
一个CommandHandler中的代码通常是一句话,ENode框架的最大好处是可以让开发者无需关注C端的技术实现,开发者只需要关心如何实现自己的业务逻辑即可。如上图所示,我们会先定义一个ConferenceCommandHandler的类,然后实现ICommandHandler<PublishConference>接口,然后进一步实现对应的Handle方法。在Handle方法内部,我们只需要从当前的上下文根据Command所关联的聚合根ID获取当前要操作的聚合根,然后调用聚合根的方法即可。我们不需要像经典DDD那样把聚合根从IConferenceRepository中取出来,再修改聚合根,再保存聚合根。而且经典DDD往往还会和工作单元(Unit of Work)配合,因为经典DDD,是支持一个应用层的方法同时修改多个聚合根的,而ENode框架是要求一个命令一次只能创建或修改一个聚合根的,即是面向最终一致性的。这点开发者需要明确与了解。从代码实现的角度,我相信ENode框架提供的方式是非常简单和直接的,没有任何多余的东西。大家可以看到使用ENode框架开发,大部分情况是不需要定义Repository的,呵呵。下面我们看看Domain聚合根的实现。
使用ENode框架开发的领域模型,聚合根的实现通常是这样的:
所以,当前我们这里被调用的方法是Publish,该方法内部,先判断当前聚合根是否已经处于发布状态,如果是,则抛出异常即可;当然,你选择忽略也没问题;如果不是,则调用ApplyEvent方法Apply某个领域事件。ApplyEvent的意思是,先找到当前事件对应的Handle方法,然后调用该Handle方法;然后调用完成后,把当前事件放入一个聚合根内部的事件队列中。如果对ENode框架的实现有一定了解的朋友应该知道,ENode在处理一个命令时,先创建一个空的ICommandContext对象,然后传给CommandHandler的Handle方法,然后当Handle方法结束后,ENode框架就能知道当前ICommandContext中有哪些聚合根修改了或创建了(框架要求一个命令一次只能涉及一个聚合根的修改),这点我已经强调过很多遍了,呵呵。然后框架如果拿到了某个修改的聚合根,它就拿出该聚合根里上面提到的内部的事件队列里的事件。然后根据这些事件生成一个EventStream的对象,然后把该对象持久化到EventStore,完成后再Publish该EventStream。这是正常流程,在这里我顺便提一下,为了让大家更好的理解内部实现的机制。通过这些介绍,我相信大家应该至少可以理解上面的Publish方法和Handle方法了吧。
另外,有些朋友可能会想,为何是先产生事件,再修改状态呢?
主要原因是因为这个Handle方法是会在事件溯源(ES)的时候被重复利用的。当我们要从EventStore通过ES还原某个聚合根时,我们是先获取该聚合根所产生的所有的事件,然后对每个事件调用聚合根的对应的Handle方法,从而实现聚合根状态的还原。这个过程也就是我们常说的事件溯源,即ES。需要提醒的是,聚合根应该在产生事件之前把各种业务规则也业务逻辑实现掉,然后只有当前操作满足所有的业务规则时,才调用ApplyEvent方法,然后在所有的Handle方法中,就是仅仅简单的等于号赋值操作,没有任何逻辑。这点非常重要。为什么要这样呢?假如我们把一些业务规则和逻辑放在Handle方法中,比如if怎么样的时候做什么赋值,else的时候做另外的赋值。那假如哪一天我们的Handle方法里的判断逻辑变化了,那我们通过事件溯源还原出来的聚合根的状态就不对了。这点应该不难理解吧。
所以,从哲学的角度来说,EventStore中的事件并不是完整的历史。事件+Handle方法才是完整的历史,两者结合可以完整恢复聚合根的状态到最新状态。所以我们的事件和Handle方法都不能修改,或者如果要修改,也必须确保兼容老的结构和实现,这点非常重要。下面我们来看看Event Handler的实现:
EventHandler的作用是根据C端聚合根产生的事件来更新CQRS的读库。需要注意的是ENode整个框架对外提供的对外API基本都是异步IO的(实际上内部的实现也都是异步IO的)。所以我们更新读库时,需要使用ADO.NET提供的Async方法类更新DB。这里我使用ENode自带的Dapper轻量级高性能ORM来实现对读库的更新。上面的代码中就是更新Conference表的IsPublish字段。但是为了确保避免并发导致的数据覆盖,所以我们需要严谨的利用乐观控制来确保数据不会被覆盖,ENode要求我们使用Version机制来实现乐观锁。这里具体其实还有非常多关于并发更新方面带来的细节。我之前写过一篇文章,大家有兴趣的可以去看一下这篇文章,本文的目的是做一个QuickStart,所以不做过多展开了。TryUpdateRecordAsync方法的内部实现如下,很简单,我就不做介绍了。
还有一点需要特别提一下,就是为何要使用Dapper而不使用EF这种ORM框架。因为ENode框架实现的是CQRS+ES的架构。所以,我们在更新读库时,是根据事件更新读库。那怎么样的更新是最快的呢?就是直接通过Insert或Update语句来更新DB。而如果通过EF这种框架,因为是面向OO的ORM,所以一般是需要先从DB取出数据转换为对象,在更新对象,再保存对象这样的思路。这个过程我个人认为,对于CQRS+ES架构的应用来说,是多余的,不必要的。我们在更新读库时,更好的方式应该是利用像Dapper这样的ORM框架,简单直接的更新读库。我通过对Dapper做了一些简单的二次封装,可以做到用最直接的代码实现目的。兼顾了代码的可读性、可维护性、灵活性,以及性能。同理查询数据时,通过Dapper也非常简单,而且还支持返回dynamic对象。Dapper是基于约定的框架,不需要做ORM映射方面的配置。我个人认为使用在CQRS+ES架构中是非常合理的。所以,对我来说,EF可以退休了,呵呵。
好了,上面介绍了发布会议的所有需要用户写的代码,是不是很简单呢?我个人认为和经典DDD的架构相比,由于有ENode框架的支持,所以开发基于CQRS+ES架构的应用,是非常简单的。下一篇要写什么还没想好,大家还想了解什么,可以及时给我反馈啊。