这篇文章内容会很短,主要是想给大家分享下我最近在做一个简单的rabbitmq客户端类库的封装的经验总结,说是简单其实一点都不简单。为了节省时间我主要按照Library的执行顺序来介绍,在你看来这里仅仅是一个简单的经验总结,但是在我看来这些经验只有在你真正的封装rabbitmq客户端库的时候且将你的客户端安全稳定的发布上线后才会真的发现这些问题。
比如你的库只是链接单个Node的时候和链接高可用集群的HAProxy时候是完全两回事。当你未能在你的库里使用反向注入LOG接口的时候一旦在线上发生网络解析和序列化等一系列在线问题时候你是多么无能为力。当你使用同步循环获取队列消息的时候一旦发生异常你的链接就会断掉等等这些细节。我总结了我在编写这个library的时候慢慢稳定下来的过程和经验。至少目前来看网络上的文章,当然我是指.NET/C#方面的,都没有讲到这些问题,大部分的文章都是简单的介绍了一个最最基本的使用和最最基本的demo而已,达不到企业级使用的要求。在这个过程中,感谢我的团队和给过我指导的同事,让我明白了一些技术道理。
好东西不能石沉大海,尤其是.NET领域更需要这样的东西来填补这一空缺。废话不多说了,进入主题,那些编写框架和组件的大道理这里就不讲了,我只说重点。
就是说你的接受Channel和发送的Channel要分离开,如果不分开会出现偶发性的消息串掉的错误,我这里现在没有环境无法重现截图。我是在做压力测试的时候,用了一个Channel的时候Debug抛出来的异常。如果你有洁癖建议把IConnection也分离开。这样不容易出错,就算出错排错也会很容易。
(图1:分开接受和发送的IConnection、Channel)
还有一点,不要将这些对象直接散落在直接使用的Client类中,要建立起一个使用上下文,就算你暴露在外面的是一个具体的类但是那个类也是一个空壳子。
我们可以在创建队列的时候设置此队列是持久化的,但是队列中的消息要在我们发送某个消息的时候打上需要持久化的状态标记。
(图2:标记此消息是需要持久化的)
(图1:在线程内部方法中加try{}catch{})
这个点很多做封装的人会容易忽视掉,我这里补充下为了保持这个文章的完整性。
其实在我之前的“ .NET应用架构设计—服务端开发多线程使用小结(多线程使用常识) ”一文中有讲到过。
这个时候你的try{}catch{}其实是不会捕获到任何ListenInit方法中的异常的,因为他在另外一个线程上下文中执行的。具体原理这里就不解释了。但是可以很容易的理解就是,你这个方法一旦执行就会立马返回了。
(图4:监听Shutdown事件,记录下LOG便于排查和监管服务的稳定性)
(图5:组件内部的LOG接口)
此接口就是你内部用来将信息传输出去的渠道,而且这个渠道是活的,有各个应用系统决定怎么记录。
简单处理你还需要一个LOG接口服务定位器对象,要不然你拿不到这个接口实例。
(图6:LOG location对象)
如果我们是使用死循环的方式在接受消息,那么一旦当你的接受消息的程序出现异常那么你的while直接就会跳出,你的链接可能是还链接在服务器上但是你的channel已经断开,说白了你的消息是不会接受到的,而且这样的开发方法很不稳定也不优雅。我们可以使用面向事件的消费者来接受消息。
(图7:使用Eventing类型的消费者接受消息)
默认情况下,一个队列里不管多少消息当你一个TCP连接打上去之后会LOCK住所有的消息,也就是说一个连接彻底占用了所有的消息,此时消息不会被其他集群的机器消费。
(图8:一次只取一个消息进行消费)
但是如果你对消息的处理的前后顺序有要求就不能这么做,你需要独立注册一个队列,然后将这样的一此只消费一个消息配置话。
(图9:创建出一个会自动重连的Connection对象)
(图10:设置心跳超时时间)
如果你连接单台节点的时候不设置这个值是没问题的,但是如果你连接的是类似HAProxy虚拟节点的时候就会出现TCP被断开的可能性。如果你不设置这个心跳超时时间,它默认是不进行心跳保持的,就会出现网络中的某个设置断开空闲的TCP连接资源。就这个问题一直搞的我们的团队到第二天两点钟。大家要记住这个点。
(图11:重新放入队列,推送给其他消费着)
最后,我是基于Rabbitmq.Client 版本3.5.3.0的基础上开发的,这个大家要注意。版本不一样会有一定的差异性。希望此文对大家在使用rabbitmq的同志有一点帮助,谢谢。