转载

游戏中的定时器

写在前面

游戏中处处都有定时器,基本上每个逻辑部分我们都能看到定时器的影子。如果翻看一下以前网上流传的一些MMO的代码,比如mangos的,比如大唐的,比如天龙的,我们都可以看到形形色色的定时器实现。

在以前,很多程序员用起来C++还都是在用C with Object,以前的C++写callback也好异步也好总是感觉哪里不对劲的样子,所以网上流传的那种线上服务器的代码,一般都是往主循环里硬塞定时器逻辑。

定时器在很多能参考到的代码里都是逻辑和底层不做区分的,这样就会导致一些问题。

一方面,底层的需求是通用性。要通用性的话就必须得在主循环中轮询timeout,而不是借助一些更高层级的抽象;

另一方面,上层的需求是易用性。要易用性的话就必须得用起来方便,而且最好是能原生嵌入在一些常规的异步编程模型中的。最不济的,需要我很方便的挂callback。再高级点,我需要能yield。最上层的,能让我在描述一次lasy evaluation的计算中描述WaitForTime语义,做future什么的当然更好了。

但是,由于之前说到的,很多现成的都是底层上层不区分的,所以最常见的可能就是利用一种比较挫的观察者模式,比如继承一个Observable之类的东西,挂在主循环中。主循环轮询timeout,timeout了就callback之前注册进来的Observable。写起来真是要多蛋疼有多蛋疼。虽然说既照顾了上层,让上层能用callback了,算是温饱,也照顾了底层,底层写起来也是主循环来做timeout的,但是这样一来就只是一个扩展性非常差的Timer模块了。

当然,这篇文章不打算继续纠缠这种形而上的设计问题,上层的一些更高层次的抽象也不是这篇文章的重点,这里重点care下底层定时器机制的实现。

定时器实现

一般比较常见的定时器实现,其实就那么几种。

一种是比较容易能想到的,一个简单的最小堆,每次tick都查一下top的expire有没有timeout,timeout了就取出来,取出来再重复。

这种模型好处就是简单,找个学过数据结构的毕业生就能写出来,不容易有bug。但是有个比较致命的问题就是,如果短期内注册了大量timer,我add的时候需要nlgn,timeout的时候还需要nlgn。

所以网上后来就出现了铺天盖地的另一种定时器实现, linux内核中的timer实现 ,当然这内核里一坨坨的代码我估计是没人想看的,不重要的细枝末节把我们需要学习的精华地方完全遮住了,看看原理就可以了。或者看下 skynet_timer的实现 ,这里的还是比较浅显易懂的,可读性也很强。

这篇文章就重点来对比下这两种定时器的实现。下面代码都上C#了。

第一种。基于最小堆实现的,首先你要有一个最小堆,动手实现一下

1 public class PriorityQueue<T> : IEnumerable<T> 2 { 3     public PriorityQueue(IComparer<T> comparer); 4     public void Push(T v); 5     public T Pop(); 6     public T Top(); 7 }
1 public interface ITimeManager 2 { 3     ITimer AddTimer(uint afterTick, OnTimerTimeout callback, params object[] userData); 4     FixedTick(); 5 }

ps.增加这个Callback主要是为了方便跑测试用例。

1 public class TrivialTimeManager : ITimeManager 2 { 3     // ... 4 }

具体的实现就不用多说了。

然后是第二种。第二种思考方式需要有这样一个前提:

通过tick来定义整个系统的时间精度下限。比如游戏中其实都不是特别care 10ms以下的精度的,我们可以定义一个tick的长度为10ms。也就是说我先挂上去的WaitFor(8ms)和后上去的WaitFor(5ms),有可能是前者先timeout的。一个tick为10ms,那么一个32bit的tick能表达的时间粒度就有将近500天,远超过一个服务器组不重启的时间了。

如果有了这样的前提,就可以针对之前提到的、方法一面对大量临近tick的timer插入锁遇到的问题,做一些特殊的优化。

也就是根据tick直接拿到timeout链表,直接dispatch,拿到这个链表的时间是一个常数,而最小堆方法拿到这个链表需要的时间是m*lgn。

当然,由于空间有限,我们不可能做到每个将要timeout的tick都有对应的链表。考虑到其实80%以上的timer的时间都不会超过2.55s,我们只针对前256个tick做这种优化措施即可。

那注册进来一个256tick之后timeout的timer怎么办呢?我们可以把时间还比较长的timer放在更粗粒度的链表中,等到还剩下的tick数小于256之后再把他们取出来重新整理一下链表就能搞定。

如果我们保证每一次tick都严格的做到:

  • 未来256tick内的链表都能常数时间取到
  • 新加入的256tick以及更迟的timer才会加入到粗粒度链表

保证这两点,就需要每个tick都对所有链表做一次整理。这样就得不偿失了,所以这里有个trade-off,就是我通过一个指针(index),来标记我当前处理的position,每过256tick是一个cycle,才进行一次整理。而整理的成本就通过均摊在256tick中,降低了实际上的单位时间成本。

概念比较抽象,先来看下数据结构。

常量的定义

1 public const int TimeNearShift = 8; 2 public const int TimeNearNum = 1 << TimeNearShift;      // 256 3 public const int TimeNearMask = TimeNearNum - 1;        // 0x000000ff 4  5 public const int TimeLevelShift = 6; 6 public const int TimeLevelNum = 1 << TimeLevelShift;    // 64 7 public const int TimeLevelMask = TimeLevelNum - 1;      // 00 00 00 (0011 1111)

基础数据结构

1 using TimerNodes = LinkedList<TimerNode>; 2 private readonly TimerNodes[TimeNearNum] nearTimerNodes; 3 private readonly TimerNodes[4][TimeLevelNum] levelTimerNodes;

tick有32位,每一个tick只会timeout掉expire与index相同的timer。

循环不变式保证near表具有这样几个性质:

  •     第i个链表中的所有timer的expire,(expire >> 8) == (index >> 8) 且(expire & TimeNearMask) == i
  •     i小于(index & TimeNearMask)的链表,都已经AllTimeout

level表有4个,分别对应9到14bit,15到20bit,21到26bit,27到32bit。

由于原理都类似,我这里拿9到14bit的表来说下循环不变式:

  •     表中的所有64个链表,所有timer的expire的高18个bit一定是与index的高18个bit相等的
  •     第i个链表的元素的expire的9到14bit单独抽出来就是i
  •     i小于(index的9到14bit单独抽出来)的链表,都已经Shift

有了数据结构和循环不变式,后面的代码也就容易理解了。主要列一下AddTimer的逻辑和Shift逻辑。

 1 private void AddTimerNode(TimerNode node)  2 {  3     var expire = node.ExpireTick;  4   5     if (expire < index)  6     {  7         throw new Exception();  8     }  9  10     // expire 与 index 的高24bit相同 11     if ((expire | TimeNearMask) == (index | TimeNearMask)) 12     { 13         nearTimerNodes[expire & TimeNearMask].AddLast(node); 14     } 15     else 16     { 17         var shift = TimeNearShift; 18  19         for (int i = 0; i < 4; i++) 20         { 21             var lowerMask = (1 << (shift+TimeLevelShift))-1; 22              23             // expire 与 index 的高bit相同 24             // (24-6*(i+1)) 25             if ((expire | lowerMask) == (index | lowerMask)) 26             { 27                 // 取出[(8+i*6), (14+i*6))这段bits 28                 levelTimerNodes[i][(expire >> shift)&TimeLevelMask].AddLast(node); 29                 break; 30             } 31  32             shift += TimeLevelShift; 33         } 34     } 35 }
 1 private void TimerShift()  2 {  3     // TODO index回绕到0的情况暂时不考虑  4     index++;  5   6     var ct = index;  7   8     // mask0 : 8bit  9     // mask1 : 14bit 10     // mask2 : 20bit 11     // mask3 : 26bit 12     // mask4 : 32bit 13  14     var partialIndex = ct & TimeNearMask; 15  16     if (partialIndex != 0) 17     { 18         return; 19     } 20  21     ct >>= TimeNearShift; 22  23     for (int i = 0; i < 4; i++) 24     { 25         partialIndex = ct & TimeLevelMask; 26  27         if (partialIndex == 0) 28         { 29             ct >>= TimeLevelShift; 30             continue; 31         } 32  33         ReAddAll(levelTimerNodes[i], partialIndex); 34         break; 35     } 36 }

以上代码用c/c++重写后品尝风味更佳。

下面我们来测一下到底linux内核风格的定时器比最小堆的快了多少。

先是构造测试用例。我这里只考虑突然的来一大批timer,然后看所有都timeout需要消耗多久。

 1 static IEnumerable<TestCase> BuildTestCases(uint first, uint second)  2 {  3     var rand = new Random();  4   5     for (int i = 0; i < first; i++)  6     {  7         yield return new TestCase()  8         {  9             Tick = (uint)rand.Next(256), 10         }; 11     } 12  13     for (int i = 0; i < 4; i++) 14     { 15         var begin = 1U << (8 + 6*i); 16         var end = 1U << (14 + 6*i); 17  18         for (int j = 0; j < rand.Next((int)second * (4 - i)); j++) 19         { 20             yield return new TestCase() 21             { 22                 Tick = (uint)rand.Next((int)(begin+end)/2), 23             }; 24         } 25     } 26 }

构造测试用例

 1 static IEnumerable<TestCase> BuildTestCases(uint first, uint second)  2 {  3     var rand = new Random();  4   5     for (int i = 0; i < first; i++)  6     {  7         yield return new TestCase()  8         {  9             Tick = (uint)rand.Next(256), 10         }; 11     } 12  13     for (int i = 0; i < 4; i++) 14     { 15         var begin = 1U << (8 + 6*i); 16         var end = 1U << (14 + 6*i); 17  18         for (int j = 0; j < rand.Next((int)second * (4 - i)); j++) 19         { 20             yield return new TestCase() 21             { 22                 Tick = (uint)rand.Next((int)(begin+end)/2), 23             }; 24         } 25     } 26 }

测试函数

 1 {  2     var maxTick = cases.Max(c => c.Tick);  3     var results = new HashSet<uint>();  4   5     foreach (var c in cases)  6     {  7         TestCase c1 = c;  8         mgr.AddTimer(c.Tick, (timer, data) =>  9         { 10             if (mgr.FixedTicks == c1.Tick) 11                 results.Add((uint) data[0]); 12         }, c.Id); 13     } 14  15     var begin = DateTime.Now; 16     for (int i = 0; i < maxTick+1; i++) 17     { 18         mgr.FixedTick(); 19     } 20     var end = DateTime.Now; 21 }

看图得结论

游戏中的定时器

first固定为一千万,这个也是比较符合实际的情况,大量的timer都是2.5s以内的。可以看出随着远timer数量的增加,linux内核定时器对比最小堆定时器的优势是越来越小的。

游戏中的定时器

这个是固定远timer的数量,系数固定为1000。跟上图得到的结论差不多,近timer占比越高,相比最小堆定时器的优势越大。

总之,linux内核定时器比起最小堆定时器的优势还是很明显的,随便就能有2倍以上的性能表现,强烈建议采用。

去年刚来工作室的时候做了个skynet的源码阅读分享,当时也提到了里面定时器的实现,但是只看代码那肯定是记不住的,总得写一遍,后来也一直没抽出时间。直到前几天看到 一个答案 ,正好业余做的一个小东西开始需要时间模块了,就实现了下,顺便产出此小品文。

正文到此结束
Loading...