关于 Disruptor,网络上有很多的解释和说法。这里简单的概括下。Disruptor 是一个消费者生产者队列框架,据官网介绍,可以提供非常强大的性能。Disruptor 与其说为我们带来了一个框架,更多的是为我们带来了一个独特思路的编程实践。总结来说大致有3点。
上面的三点是在 Disruptor 中带来的一些技巧。有些是常用的,有些是实现起来比较独特的。
生产者消费者模型自然是离不开队列的。但是使用传统的队列,面对并发等问题,在性能上是否已经足够的高效?或者说是否有其他的办法来进一步的提高性能。Disruptor 为我们提供了一个思路和实践(这个思路不是 Disruptor 首创,但是他们提供了一个好的完整实践)
定义一个数组,长度为2的次方幂(因为计算机是二进制的,所以2次方幂可以进行并运算来代替取模运算)。设定一个数字标志表示当前的可用的位置(可以从0开始)。当这个数字标志不断增长到大于数组长度时进行与数组长度的并运算,得到的新数字依然在数组的长度范围内,就又可以插入。这样就好像一直插入直到数组末尾又再次从头开始,故而称之为循环数组。
一般的循环数组有头尾两个标志位。这点和队列很像。头标志位表示下一个可以插入的位置,尾标志位表示下一个可以读取的位置。头标志位不能大于尾标志位一个数组长度(因为这样就插入的位置和读取的位置就重叠了会导致数据丢失),尾标志位不能等于头标志位(因为这样读取的数据实际上是上一轮的旧数据)
我们知道在java中如果创造大量的对象使用后弃用,JVM 会在适当的时候进行 GC 操作。大量的对象 GC 操作是很消耗时间的。所以如果能够避免 GC 也可以提高性能,特别是在数据交互非常频繁的时候。
在循环数组中,可以事先在数组中填充好数据。一旦有新数据的产生,要做的就是修改数组中某一位中的一些属性值。这样可以避免频繁创建数据和弃用数据导致的 GC。这点比起队列是要好的。
多线程在队列也好,循环数组也好,必然存在对标志位的竞争。无论是使用锁来避免竞争,还是使用 CAS 来进行无锁算法。只要争用的情况存在,并且线程较多,都会出现对资源的不断消耗。争用的对象越多,争用中消耗掉的资源也就越多。为了避免这样的情况,减少争用的资源就是一个手段。比如在循环数组中只保留一个标志位,也就是下一个可以写入数据位置的标志位。而尾部标志位则在各个消费者线程中保存(具体的编程手法后续细讲)。
如果确定只有一个生产者,也就是说只有一个写线程。则在循环数组中的使用会更加简化。具体来说单线程更新数组上的标志位,那这种情况,标志位就无需采用 CAS 写的方式来确定下一个可写入的位置,直接就是在单线程内进行普通的更新即可。
如果存在多个生产者,则可写入的标志位需要用 CAS 算法来进行争夺,避免锁的使用。多个线程通过 CAS 得到唯一的不冲突的下一个可写序号。由于需要获得序号后才能进行写入,而写入完成才可以让消费者线程进行消费。所以才获得序号后,完成写入前,必须有一种方式让消费者检测是否完成。以避免消费者拿到还未填入输入的数组位。
为了达到这个目标,存在简单—效率低和复杂—效率高两种方式。
使用两个标志位。
多个生产者通过 CAS 获得 prePut 的不同的值。在获得的序号并且完成数据写入后,将 put 的值以 CAS 方式递增(比如获得的序号是7,只有 put 是6的时候才允许设置成功),称之为发布。这种方式存在 一个缺点 ,如果多个线程并发写入,获取 prePut 的值不会堵塞,假设其中一个生产者在写入数据的时候稍慢,则其他的线程写入完毕也无法完成发布。就会导致循环等待,浪费了 CPU 性能。
在上面的方式中,主要的争夺环节集中在多线程发布中,序号大的线程发布需要等到序号小的线程发布完成后才能发布。那我们的优化的点也在这个地方。如果只有一个地方可以写入完成信息,必然需要争夺。为了避免争夺,我们可以使用标志数组(长度和内容数组相同,每一位表示相同下标的内容数组是否发布)来表示每一个位置是否写入。这样就可以避免发布的争夺(大家的标志位都不在一起了)。
但是又来带来一个问题,用什么数字来表示是否已经发布完成?如果只是0和1,那么写过1轮以后,标志数组位上就都是1了。又无法区分。所以标志数组上的数字应该在循环数组的每一轮循环的值都不同。比如一开始都是-1,第一轮中是0的表示已发布,第二轮中是0表示没发布,是1的表示已发布。下面的是发布的算法步骤:
一般在软件编程中,很少有工程师会关注一些硬件的信息。不过如果追求性能达到极致,那么对于一些硬件知识的了解就成了必要。这其中CPU 缓存的知识会神奇的提高我们的程序性能。
在编程上,网络关于 CPU 缓存的知识介绍很多。这里简单说下。在硬件中,CPU 存在着多级缓存的结果,越接近 CPU 的缓存容量越小,速度越快。每一个物理内核都有自己的缓存体系。不同的 CPU 之间通过缓存嗅探协议来确定缓存中的数据是否已经失效。如果失效了,CPU 会去内存中读取数据,并且将最新的数据在特定指令的帮助下写入到内存中。
CPU 缓存是以行为单位进行存取的。以前的 CPU 是32个字节一行,现在则是64个字节一行。因为这种行存取的方式,所以称之为缓存行。如果一个对象中不同属性在多线程中被频繁更新,会导致一个问题: 由于在同一个缓存行中的不相关变量的更新导致整个缓存行失效 。缓存行失效后 CPU 就只好到主存中重新读取数据。这个问题在并发队列中特别明显。为了修正这个问题,JDK 7 中特意提供了 transferqueue 来解决这个问题。
既然问题的发生是因为同一个缓存行中有不相关的变量被更新导致缓存行需要的数据一起失效,那么解决的办法就是让这个频繁被更新的变量独占一个缓存行即可。也就是剩下的位置就用无关数据填充。这样就保证了关键变量不会因为其他变量的更新而失效。具体的填充方式,就是在一个 Java 对象中设定无意义的变量,根据变量的长度来计算需要的个数。以下是示例代码:
//现在一般的cpu架构都是64个字节的缓存行,针对这个情况,缓存行填充可以如下进行 class LeftPad { protected long p1, p2, p3, p4, p5, p6, p7; } class RealValue extends LeftPad { protected volatile long point = -1; // 前后都有7个元素填充,可以保证该核心变量独自在一个缓存行中 } class RightPad extends RealValue { protected long p9, p10, p11, p12, p13, p14, p15; } public class CpuCachePadingValue extends RightPad { }
Disruptor 整个框架的实现过程都在尽量的减少对锁的使用。比如生产者消费者中最容易出现争夺的,其实就是其中的消息队列。那么对于这个消息队列,我们可以采用的优化手段包括
针对上面的第二点详细展开说一下。一般来说,队列中信息的处理有两种不同的形式,第一种是这个消息需要所有消费者都处理完毕,才能认为是被使用好了。第二种是争夺到使用权的消费者线程进行消费,其他消费者线程争夺下一个。无论哪一种,都可以将消费者已经处理的序号保存在消费者线程内。而如果信息只允许被一个线程消费,可以在内部使用 CAS 来争夺。而生产者线程则需要持有消费者的类的信息,好用来判断所有消费者中消费的最小的序号,以避免在数据写入时覆盖了某个消费者尚未处理的数据信息。
Disruptor 可以让不同的消费者按照一定的顺序进行消息处理。比如一个消息,必须先经过日志处理 A1 保存日志,数据转换处理器 A2 清理才能最终被业务处理器 A3 进行实际的业务处理。而 A1 和 A2 并没有任何前后关系,但是 A3 必须等 A1 和 A2 都完成后才能进行。那么在实际编码时,可以让 A3 追踪 A1 和 A2 的处理序号。所有的消费者都在等待队列中可用序号达到自己需要的序号,一旦到达,排位靠后的处理器就循环检测排位靠前的处理器是否已经将数据处理完毕,处理完毕之后自己开始对数据的处理。
Disruptor 这个框架在整个的编码过程中一直都在体现本地缓存数据,使用 CAS 来代替锁,尽可能无锁甚至无 CAS 这样的一种编程思想。根据官网的说明,这样的编码思想是在他们追求多线程以提高性能遇到失败后(项目复杂性、可测试性、维护性等),回过头思考在单线程下的性能可能性(单线程无锁必然是性能最高的,但是吞吐量就有待商榷)。