游戏中处处都有定时器,基本上每个逻辑部分我们都能看到定时器的影子。如果翻看一下以前网上流传的一些MMO的代码,比如mangos的,比如大唐的,比如天龙的,我们都可以看到形形色色的定时器实现。
在以前,很多程序员用起来C++还都是在用C with Object,以前的C++写callback也好异步也好总是感觉哪里不对劲的样子,所以网上流传的那种线上服务器的代码,一般都是往主循环里硬塞定时器逻辑。
定时器在很多能参考到的代码里都是逻辑和底层不做区分的,这样就会导致一些问题。
一方面,底层的需求是通用性。要通用性的话就必须得在主循环中轮询timeout,而不是借助一些更高层级的抽象;
另一方面,上层的需求是易用性。要易用性的话就必须得用起来方便,而且最好是能原生嵌入在一些常规的异步编程模型中的。最不济的,需要我很方便的挂callback。再高级点,我需要能yield。最上层的,能让我在描述一次lasy evaluation的计算中描述WaitForTime语义,做future什么的当然更好了。
但是,由于之前说到的,很多现成的都是底层上层不区分的,所以最常见的可能就是利用一种比较挫的观察者模式,比如继承一个Observable之类的东西,挂在主循环中。主循环轮询timeout,timeout了就callback之前注册进来的Observable。写起来真是要多蛋疼有多蛋疼。虽然说既照顾了上层,让上层能用callback了,算是温饱,也照顾了底层,底层写起来也是主循环来做timeout的,但是这样一来就只是一个扩展性非常差的Timer模块了。
当然,这篇文章不打算继续纠缠这种形而上的设计问题,上层的一些更高层次的抽象也不是这篇文章的重点,这里重点care下底层定时器机制的实现。
一般比较常见的定时器实现,其实就那么几种。
一种是比较容易能想到的,一个简单的最小堆,每次tick都查一下top的expire有没有timeout,timeout了就取出来,取出来再重复。
这种模型好处就是简单,找个学过数据结构的毕业生就能写出来,不容易有bug。但是有个比较致命的问题就是,如果短期内注册了大量timer,我add的时候需要nlgn,timeout的时候还需要nlgn。
所以网上后来就出现了铺天盖地的另一种定时器实现, linux内核中的timer实现 ,当然这内核里一坨坨的代码我估计是没人想看的,不重要的细枝末节把我们需要学习的精华地方完全遮住了,看看原理就可以了。或者看下 skynet_timer的实现 ,这里的还是比较浅显易懂的,可读性也很强。
这篇文章就重点来对比下这两种定时器的实现。下面代码都上C#了。
第一种。基于最小堆实现的,首先你要有一个最小堆,动手实现一下
1 public class PriorityQueue<T> : IEnumerable<T> 2 { 3 public PriorityQueue(IComparer<T> comparer); 4 public void Push(T v); 5 public T Pop(); 6 public T Top(); 7 }
1 public interface ITimeManager 2 { 3 ITimer AddTimer(uint afterTick, OnTimerTimeout callback, params object[] userData); 4 FixedTick(); 5 }
ps.增加这个Callback主要是为了方便跑测试用例。
1 public class TrivialTimeManager : ITimeManager 2 { 3 // ... 4 }
具体的实现就不用多说了。
然后是第二种。第二种思考方式需要有这样一个前提:
通过tick来定义整个系统的时间精度下限。比如游戏中其实都不是特别care 10ms以下的精度的,我们可以定义一个tick的长度为10ms。也就是说我先挂上去的WaitFor(8ms)和后上去的WaitFor(5ms),有可能是前者先timeout的。一个tick为10ms,那么一个32bit的tick能表达的时间粒度就有将近500天,远超过一个服务器组不重启的时间了。
如果有了这样的前提,就可以针对之前提到的、方法一面对大量临近tick的timer插入锁遇到的问题,做一些特殊的优化。
也就是根据tick直接拿到timeout链表,直接dispatch,拿到这个链表的时间是一个常数,而最小堆方法拿到这个链表需要的时间是m*lgn。
当然,由于空间有限,我们不可能做到每个将要timeout的tick都有对应的链表。考虑到其实80%以上的timer的时间都不会超过2.55s,我们只针对前256个tick做这种优化措施即可。
那注册进来一个256tick之后timeout的timer怎么办呢?我们可以把时间还比较长的timer放在更粗粒度的链表中,等到还剩下的tick数小于256之后再把他们取出来重新整理一下链表就能搞定。
如果我们保证每一次tick都严格的做到:
保证这两点,就需要每个tick都对所有链表做一次整理。这样就得不偿失了,所以这里有个trade-off,就是我通过一个指针(index),来标记我当前处理的position,每过256tick是一个cycle,才进行一次整理。而整理的成本就通过均摊在256tick中,降低了实际上的单位时间成本。
概念比较抽象,先来看下数据结构。
常量的定义
1 public const int TimeNearShift = 8; 2 public const int TimeNearNum = 1 << TimeNearShift; // 256 3 public const int TimeNearMask = TimeNearNum - 1; // 0x000000ff 4 5 public const int TimeLevelShift = 6; 6 public const int TimeLevelNum = 1 << TimeLevelShift; // 64 7 public const int TimeLevelMask = TimeLevelNum - 1; // 00 00 00 (0011 1111)
基础数据结构
1 using TimerNodes = LinkedList<TimerNode>; 2 private readonly TimerNodes[TimeNearNum] nearTimerNodes; 3 private readonly TimerNodes[4][TimeLevelNum] levelTimerNodes;
tick有32位,每一个tick只会timeout掉expire与index相同的timer。
循环不变式保证near表具有这样几个性质:
level表有4个,分别对应9到14bit,15到20bit,21到26bit,27到32bit。
由于原理都类似,我这里拿9到14bit的表来说下循环不变式:
有了数据结构和循环不变式,后面的代码也就容易理解了。主要列一下AddTimer的逻辑和Shift逻辑。
1 private void AddTimerNode(TimerNode node) 2 { 3 var expire = node.ExpireTick; 4 5 if (expire < index) 6 { 7 throw new Exception(); 8 } 9 10 // expire 与 index 的高24bit相同 11 if ((expire | TimeNearMask) == (index | TimeNearMask)) 12 { 13 nearTimerNodes[expire & TimeNearMask].AddLast(node); 14 } 15 else 16 { 17 var shift = TimeNearShift; 18 19 for (int i = 0; i < 4; i++) 20 { 21 var lowerMask = (1 << (shift+TimeLevelShift))-1; 22 23 // expire 与 index 的高bit相同 24 // (24-6*(i+1)) 25 if ((expire | lowerMask) == (index | lowerMask)) 26 { 27 // 取出[(8+i*6), (14+i*6))这段bits 28 levelTimerNodes[i][(expire >> shift)&TimeLevelMask].AddLast(node); 29 break; 30 } 31 32 shift += TimeLevelShift; 33 } 34 } 35 }
1 private void TimerShift() 2 { 3 // TODO index回绕到0的情况暂时不考虑 4 index++; 5 6 var ct = index; 7 8 // mask0 : 8bit 9 // mask1 : 14bit 10 // mask2 : 20bit 11 // mask3 : 26bit 12 // mask4 : 32bit 13 14 var partialIndex = ct & TimeNearMask; 15 16 if (partialIndex != 0) 17 { 18 return; 19 } 20 21 ct >>= TimeNearShift; 22 23 for (int i = 0; i < 4; i++) 24 { 25 partialIndex = ct & TimeLevelMask; 26 27 if (partialIndex == 0) 28 { 29 ct >>= TimeLevelShift; 30 continue; 31 } 32 33 ReAddAll(levelTimerNodes[i], partialIndex); 34 break; 35 } 36 }
以上代码用c/c++重写后品尝风味更佳。
下面我们来测一下到底linux内核风格的定时器比最小堆的快了多少。
先是构造测试用例。我这里只考虑突然的来一大批timer,然后看所有都timeout需要消耗多久。
1 static IEnumerable<TestCase> BuildTestCases(uint first, uint second) 2 { 3 var rand = new Random(); 4 5 for (int i = 0; i < first; i++) 6 { 7 yield return new TestCase() 8 { 9 Tick = (uint)rand.Next(256), 10 }; 11 } 12 13 for (int i = 0; i < 4; i++) 14 { 15 var begin = 1U << (8 + 6*i); 16 var end = 1U << (14 + 6*i); 17 18 for (int j = 0; j < rand.Next((int)second * (4 - i)); j++) 19 { 20 yield return new TestCase() 21 { 22 Tick = (uint)rand.Next((int)(begin+end)/2), 23 }; 24 } 25 } 26 }
构造测试用例
1 static IEnumerable<TestCase> BuildTestCases(uint first, uint second) 2 { 3 var rand = new Random(); 4 5 for (int i = 0; i < first; i++) 6 { 7 yield return new TestCase() 8 { 9 Tick = (uint)rand.Next(256), 10 }; 11 } 12 13 for (int i = 0; i < 4; i++) 14 { 15 var begin = 1U << (8 + 6*i); 16 var end = 1U << (14 + 6*i); 17 18 for (int j = 0; j < rand.Next((int)second * (4 - i)); j++) 19 { 20 yield return new TestCase() 21 { 22 Tick = (uint)rand.Next((int)(begin+end)/2), 23 }; 24 } 25 } 26 }
测试函数
1 { 2 var maxTick = cases.Max(c => c.Tick); 3 var results = new HashSet<uint>(); 4 5 foreach (var c in cases) 6 { 7 TestCase c1 = c; 8 mgr.AddTimer(c.Tick, (timer, data) => 9 { 10 if (mgr.FixedTicks == c1.Tick) 11 results.Add((uint) data[0]); 12 }, c.Id); 13 } 14 15 var begin = DateTime.Now; 16 for (int i = 0; i < maxTick+1; i++) 17 { 18 mgr.FixedTick(); 19 } 20 var end = DateTime.Now; 21 }
first固定为一千万,这个也是比较符合实际的情况,大量的timer都是2.5s以内的。可以看出随着远timer数量的增加,linux内核定时器对比最小堆定时器的优势是越来越小的。
这个是固定远timer的数量,系数固定为1000。跟上图得到的结论差不多,近timer占比越高,相比最小堆定时器的优势越大。
总之,linux内核定时器比起最小堆定时器的优势还是很明显的,随便就能有2倍以上的性能表现,强烈建议采用。
去年刚来工作室的时候做了个skynet的源码阅读分享,当时也提到了里面定时器的实现,但是只看代码那肯定是记不住的,总得写一遍,后来也一直没抽出时间。直到前几天看到 一个答案 ,正好业余做的一个小东西开始需要时间模块了,就实现了下,顺便产出此小品文。