skynet 本质上只是一个消息分发器,以服务为单位,给每个服务一个独立的 id ,可以从任意服务向另一个服务发送消息。
在此基础上,我们在服务中接入 Lua 虚拟机,并将消息收发的 api 封装成 lua 模块。目前用 lua 编写的服务在最底层只有一个入口,就是接收并处理一条 skynet 框架转发过来的消息。我们可以通过 skynet.core.callback (这是一个内部 API ,用 C 编写,通常由 skynet.start 调用)把一个 lua 函数设置到所属的服务模块中。每个服务必须设置,且只能设置一个回调函数。这个回调函数在每次收到一条消息时,接收 5 个参数:消息类型、消息指针、消息长度、消息 session 、消息来源。
消息大致分两大类,一类是别人对你发起的请求,一类是你过去对外的请求收到的回应。无论是哪类,都是通过同一个回调函数进入。
在实际使用 skynet 时,你可以直接使用 rpc 的语法,向外部服务发起一个远程调用,等对方发送了回应消息后,逻辑接着走下去。那么,框架是如何把回调函数的模式转换为阻塞 API 调用的形式呢?
这多亏了 lua 支持 coroutine 。可以让一段代码运行了一半时挂起,在之后合适的时候在继续运行。
为了实现这点,我们需要在收到每条请求消息时,先创建一个 coroutine ,在 coroutine 中去运行该类消息的 dispatch 函数(使用框架时,可通过 skynet.dispatch 设置消息的处理函数)。之所以必须先创建 coroutine 而不能直接调用消息处理函数,是因为我们无法预知在消息处理的过程中会不会因为阻塞 API 而需要挂起执行流程。等到第一次需要挂起时才把执行流程绑定到 coroutine 上是做不到的。
然后,所有的阻塞 API 都是通过 coroutine.yield 挂起当前 coroutine ,并把挂起类型以及可能用到的数据传出来。而框架会捕获住这些参数,也就进一步知道该去做些什么。这也就解释了阻塞 API 为什么必须在消息处理函数中调用,而不能直接写在服务的主体代码中的原因。因为初始化部分的代码并不运行在框架创建出来的 coroutine 中,coroutine.yield 也就无处捕获处理。
例如,对于 skynet.call ,其实是生成了一个对当前服务来说唯一的 session 号调用 yield 给框架发送 "CALL" 这个指令。框架中的 resume 捕获到 "CALL" 后,就会把 session 和 coroutine 对象记录在表中,然后挂起 coroutine ,结束当前的回调函数。等待 skynet 底层框架后后续消息进来时再处理。(实际上,这里还会处理 skynet.fork 创建的额外线程)
当收到回应消息时,会根据 session 号找到之前记录的 coroutine 对象。然后 resume 之前没有做完的业务即可。从应用层角度看起来,就只是一次阻塞调用而已。
以上仅仅是 skynet 在搭建框架时利用 coroutine 把回调转换为阻塞调用用的一种手法,并不是唯一的方案。比如,你还可以不通过 yield 传递任何数据给框架,全部都塞在额外的 table 中。而调度器仅仅在收到 yield 后结束当前的回调函数就够了。但如果这样做,在满足下面的一个需求时,就会遇到一些小麻烦(可能需要借助一些额外的全局标记变量才能搞定):
如果我们希望在 skynet 框架下的消息函数中使用 coroutine 库怎么办?
也就是把 skynet 框架和正常的 coroutine 机制混合使用。如果你直接使用 coroutine ,那么所有 skynet 的阻塞 API 都不能正常工作了。它们会触发 yield ,而被用户你自己写的 resume 捕获到,而不能正确处理。
这时,我们引入了 skynet.coroutine 这个库。你可以用 skynet.coroutine 全面替代 lua 原生的 coroutine 库,api 是一致的。
它所做的事情就是在 yield coroutine 的时候,在传出参数的最前面加上一个 "USER" 类型(也就是在 skynet 框架下,所有的 yield 都必须在第一个参数给出挂起类型),而在 resume coroutine 时,一旦发现是 "USER" 类型时,就去掉这个类型值,而把剩下的参数直接返回,阻止外框架结束消息处理;而如果是其它类型,则向上传播给框架,让框架挂起当前消息处理流程,等待底层的回应再继续。这种做可以让应用层的 coroutine 看起来不被 skynet 框架的阻塞调用所打断。
当然,这还需要给 skynet.coroutine.status 在 "normal" "running" "suspended" "dead" 等类型之外增加一类 coroutine 状态,叫做 "blocked" 。意思是,coroutine 被底层框架挂起,但不可以由引用层 resume 。
这个状态类似于 "normal" ,是 skynet 框架下的一个特例。因为 skynet 框架下两个独立的请求消息处理流程可视为并行的处理线程。线程之间数据是共享的,也意味着一条线程创建的 coroutine 对象对另一线程可见,也可以调用 resume 。而 "blocked" 状态可以阻止错误的 resume 调用。
实现 skynet.coroutine 封装并不算复杂,里面最大的难点其实在于 pofile 对 skynet 线程的时间分析。 profile 会在 yield 的时候暂停计时,而在 resume 时继续。这样才可能正确统计出一个请求的完整流程到底消耗了多少 CPU 时间。而用户 coroutine 的引入会增加统计的复杂性。我们需要跟踪 resume 的调用,回溯到底是哪条消息间接执行了 coroutine 中的代码,才能正确的把耗时加上去。有兴趣了解细节的同学可以直接阅读代码。
一般我们不太需要直接使用 coroutine 模块。skynet 本身提供了 skynet.fork() 方法来创建一条新的业务线程,可以用 skynet.wait(co) 来挂起,并用 skynet.wakeup(co) 来唤醒。区别在于 wakeup 只是向框架发送一个信号,需要等框架来调度;而不像 coroutine.resume(co) 会直接延续挂起的 coroutine 。
那么什么时候可能需要使用 coroutine 模块呢?
我认为最大的用途是把 coroutine 作为迭代器来使用。 PIL 里有一个不错的例子 。
正巧半年前我自己也遇到一个实际需求利用 coroutine 实现迭代器,下面分享一下这个案例(和 skynet 关系不大)。
由于 lua 可定制的内存管理器比标准 CRT 的内存管理 api 约定提供更多的信息,以及 lua vm 本身的内存取用工作模式可以被预知,所以有可能定制一个比通用分配器更好的内存管理器。(对于 skynet 还有额外的意义:可以让不同的 lua vm 使用不同区块的内存页,在 vm 关闭时减少内存碎片)
内存管理器写起来容易,想做的稳定放心且判断是否真的更高效却不那么容易。所以我想了一个办法来制作测试用的数据。
我在项目实际运行的环境里定制了一个特别的内存管理器,log 了所有的内存管理器调用行为。在实际项目运行相当长一段时间后,就得到了好多组,每组几 G 的数据。这些数据严格反应了真实项目在运行过程中内存是如何在使用的。
我可以用这些数据严格测试自定义的内存管理模块,它将和线上产品经受完全一致的使用。我可以加上额外的检查,即在分配出内存后填充独有的数据,并在释放的时候严格检查填充。还可以检测任意时间点的碎片率,峰值内存占用情况等等。当然也可以在关掉检测后和标准内存分配器在速度上一较高下。或是针对项目做算法微调以观成效。
只是,直接使用这份 log 数据并不容易。log 中记录的是每次内存配及释放的地址信息。如果我在测试代码中建一张大的 hash 表来动态保存它们有额外的开销,这个开销很可能会对内存分配器性能测量本身造成影响。因为内存分配模块本身运行速度就很快,甚至比 hash 表的实现要快。而且如果是个动态 hash 表,它本身也需要使用内存管理函数,这样干扰就更大了。
我倾向于在测试函数中建一个足够大的静态数组,把测试数据以流式读进去。理论上,数组的大小不会超过实际运行过程中同时存在的内存块条目数量。只要对原始 log 数据做一点处理,把内存地址转换为数组里的序号就好了。
测试程序读取处理过的 log ,只需要知道一条内存分配请求应该放在静态数组的第几项中,而释放请求应该去释放静态数组中的第几项就好了。这样对测试本身的干扰最小。
我的任务就是加工原始 log 。
一开始我想的比较简单,这些 log 也就转换一次而已,随便写个脚本算一下就好了。无非是把整个 log 加载到一个 lua table 里,比对一下地址,转换成编号。
实际做的时候我发现遇到了点小麻烦:log 文件实在是太大了,处理起来迅速超过了我的物理内存上限,变得非常缓慢。随后我想到了用一个 coroutine 做成迭代器,一边处理源数据流一边做转换,一边输出。
虽然不用 coroutine 也可以做到这点,但实现起来会麻烦许多。用 coroutine 完成这些需求非常的自然。如果你不介意读一下我当初随手写的脚本, 可以在 gist 上找到它们 。