接前一篇文 ,谈谈 skynet 消息队列的一些新想法。
之前谈到,每个服务的消息接收队列可以是定长的,且不必太长。因为正常运行中,每个服务都应该尽量消化掉需要处理的消息,否则会预示着某种上层设计的问题。
但是,在接收队列满的时候直接丢掉消息显然是不合理的。那意味着必须有更健全的错误传播机制,让发送失败方可以出错而中断业务。允许发送消息出错可能使上层结构设计更难。
让发送方阻塞在 skynet 中显然也不是个好方案。因为 skynet 的服务是允许阻塞时重入执行另一条新 session 的,这是和 erlang 最大的不同。这可以让单个 lua vm 的性价比更高,可以在要需要的时候,做共享状态,而不必全部业务都通过相对低效的消息通讯来完成;但其负面代价是重入会引发一些隐讳的 bug 。很多已有的 skynet 项目都依赖 send 消息不阻塞这点来保证逻辑正确,不能轻易修改。
我的解决方案是给每个服务再做一组发送队列。最接收方忙的时候,把待发消息放在自己这里的发送队列中。这样就可以由框架来确保消息都能正确的依次发送(这里不保证目的地不同的消息的先后次序,但保证目的地相同的消息次序)。
这两天,我仔细考虑了这个方案。从单一接收队列改为一个接收队列和若干发送队列。实现复杂度一定是增加了,但并没有达到不可接受的程度。这个方案并不难想到,也不难做到,但 3 年前实现 skynet 时我并没有这么做,一定是有些原因吧。
其中一个核心问题是,处理 IO 和 timer 的线程不同于普通的服务,它们是和系统打交道的。也按类似的机制做就不太合理了。它们要做的是尽量匀速的接收外部消息,并马上转发到 skynet 内部,它们是面对几乎所有 skynet 内部服务高频工作的。不应该有复杂的调度机制。
ps. IO 和 timer 线程也不是普通的 skynet 服务。我曾经把 IO 做成一个独立服务,后来又放弃了这个做法而移到核心中去。
一个变通的方法是让两个特殊的定制服务和 IO 及 timer 线程对 1 对 1 对接。对接的通道是无限长的,由于只有一个读取方一个接收方,这个消息队列在实现时也可以有针对性。
另一个问题是,在服务退出时如果处理遗留消息。
我们必须让未处理的请求被妥善回应。在 skynet 1.0 中,这个步骤是在上层完成的。lua 层会在 exit 时遍历所有没有回应的请求,发送一个 error 消息。
而当发送消息不再保证送达时,问题就变得有点棘手。在新方案中,未发出的消息是暂存在自己的发送队列中的,一旦自己都不在,谁来保证这些消息送达呢?
同样的问题还有:当暂发请求真正发出的时候,对方已经退出,需要重新产生一个错误回应。
我的解决方案是:首先在底层就严格区分请求/回应/单向推送/错误传播 这些消息,可以直接在底层做出合理处理;然后让服务的销毁也严格放在唯一一个服务中进行。在销毁过程中,收集待发消息队列,采集这些消息,然后将需要处理的放在自己这里,之后持续发送。
关于服务的创建和销毁部分。我有许多新想法,打算另开一篇 blog 来记录。
总结一下,这篇主要谈消息队列的新设计。在消息队列方面,我计划按需定制三类队列。
第一,多写一读的固定长度并发队列。由于只有一个读者,且队列长度固定不变。所以在出队列的一端是不需要任何锁的。锁只放在进队列的一端。但这里并不需要无锁设计来减少 spin lock 的盲等。因为任何一个写入者碰到队列满或队列有别的写入者正在操作,都可以一致视为队列忙,不必反复重试。它只需要把待发数据放在自己的发送队列即可。
第二,读写全在一起的无并发队列。用来暂存在接收方忙时的待发数据(还会用于服务销毁时收集待办业务)。这个队列可以在 OOM 未发生前无限增长。因为无并发情况,很容易实现正确。每个服务将配备多个这样的队列。这个待发队列不会用得太多,所以默认长度会很短,多个队列也不需要用 hash 表索引,简单一个数组即可。需要用时直接 O(n) 遍历(n 不会太大,因为过载可能的服务并不会太多,多的话系统根本不能正常工作)。
第三,专用于 IO 线程及 timer 线程和 skynet 内部服务对接的一读一写队列(管道)。写入方和读取方分属不同线程。这个队列有可能按需增长。这个可以用一个读写锁来保证并发正确。写锁只发生在写入队列满时,读取方每次读操作都需申请读锁。
最后,昨天我实在忍不住光在脑子里想,刷了一天的代码(大约 1000 行完全新写的)。
好吧,我食言了,等不到年后了。skynet 2.0 重写计划开始了一天。新代码暂时只能编译通过,还不完整,不能运行。我暂放在一个 临时仓库 里,等合适的时候再合并到 skynet 主仓库的 2.0 分支上。
有兴趣的同学可以帮忙 review 一下,印证一下这两篇 blog 的想法的具体实现。