disruptor经过几年的发展,似乎已经成为性能优化的大杀器,几乎每个想优化性能的项目宣称自己用上了disruptor,性能都会呈现质的跃进。毕竟,最好的例子就是LMAX自己的架构设计,支撑了600w/s的吞吐。
本文试图从代码层面将关键问题做些解答。
Disruptor: 实际上就是整个基于ringBuffer实现的生产者消费者模式的容器。
RingBuffer: 著名的环形队列,可以类比为BlockingQueue之类的队列,ringBuffer的使用,使得内存被循环使用,减少了某些场景的内存分配回收扩容等耗时操作。
EventProcessor: 事件处理器,实际上可以理解为消费者模型的框架,实现了线程Runnable的run方法,将循环判断等操作封在了里面。
EventHandler: 事件处置器,与前面处理器的不同是,事件处置器不负责框架内的行为,仅仅是EventProcessor作为消费者框架对外预留的扩展点罢了。
Sequencer: 作为RingBuffer生产者的父接口,其直接实现有SingleProducerSequencer和MultiProducerSequencer。
EventTranslator: 事件转换器。实际上就是新事件向旧事件覆盖的接口定义。
SequenceBarrier: 消费者路障。规定了消费者如何向下走。都说disruptor无锁,事实上,该路障算是变向的锁。
WaitStrategy: 当生产者生产得太快而消费者消费得太慢时的等待策略。
把上面几个关键概念画个图,大概长这样:
所以接下来主要也就从生产者,消费者以及ringBuffer3个维度去看disruptor是如何玩的。
生产者发布消息的过程从disruptor的publish方法为入口,实际调用了ringBuffer的publish方法。publish方法主要做了几件事,一是先确保能拿到后面的n个sequence;二是使用translator来填充新数据到相应的位置;三是真正的声明这些位置已经发布完成。
public void publishEvent(EventTranslator<E> translator) { final long sequence = sequencer.next(); translateAndPublish(translator, sequence); } public void publishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize) { checkBounds(translators, batchStartsAt, batchSize); final long finalSequence = sequencer.next(batchSize); translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence); }
获取生产者下一个sequence的方法,细节已经注释,实际上最终目的就是确保生产者和消费者互相不越界。
public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } //该生产者发布的最大序列号 long nextValue = this.nextValue; //该生产者欲发布的序列号 long nextSequence = nextValue + n; //覆盖点,即该生产者如果发布了这次的序列号,那它最终会落在哪个位置,实际上是nextSequence做了算术处理以后的值,最终目的是统一计算,否则就要去判绝对值以及取模等麻烦操作 long wrapPoint = nextSequence - bufferSize; //所有消费者中消费得最慢那个的前一个序列号 long cachedGatingSequence = this.cachedValue; //这里两个判断条件:一是看生产者生产是不是超过了消费者,所以判断的是覆盖点是否超过了最慢消费者;二是看消费者是否超过了当前生产者的最大序号,判断的是消费者是不是比生产者还快这种异常情况 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { cursor.setVolatile(nextValue); // StoreLoad fence long minSequence; //覆盖点是不是已经超过了最慢消费者和当前生产者序列号的最小者(这两个有点难理解,实际上就是覆盖点不能超过最慢那个生产者,也不能超过当前自身,比如一次发布超过bufferSize),gatingSequences的处理也是类似算术处理,也可以看成是相对于原点是正还是负 while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { //唤醒阻塞的消费者 waitStrategy.signalAllWhenBlocking(); //等上1纳秒 LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? } //把这个最慢消费者缓存下来,以便下一次使用 this.cachedValue = minSequence; } //把当前序列号更新为欲发布序列号 this.nextValue = nextSequence; return nextSequence; }
translator由用户在调用时自己实现,其实就是预留的一个扩展点,将覆盖事件预留出来。大部分实现都是将ByteBuffer复制到Event中,参考disruptor github官方例子。
最后声明新序列号发布完成,实际上就是设置了cursor,并且通知可能阻塞的消费者,这里已经发布完新的Event了,快来消费吧。
public void publish(long sequence) { cursor.set(sequence); waitStrategy.signalAllWhenBlocking(); }
以上就是单生产者的分析,MultiProducerSequencer可以类似分析。
等待策略实际上就是用来同步生产者和消费者的方法。SequenceBarrier只有一个实现ProcessingSequenceBarrier,中间就用到了WaitStrategy
BlockingWaitStrategy就是真正的加锁阻塞策略,采用的就是ReentrantLock以及Condition来控制阻塞与唤醒。
TimeoutBlockingWaitStrategy是BlockingWaitStrategy中条件带超时的版本。
LiteBlockingWaitStrategy是BlockingWaitStrategy的改进版,走了ReentrantLock和CAS轻量级锁结合的方式,不过注释说这算是实验性质的微性能改进。
BusySpinWaitStrategy算是一个自旋锁,其实现很有趣,即不停的调用Thread类的onSpinWait方法。
YieldingWaitStrategy是自旋锁的一种改进,自旋锁对于cpu来说太重,于是YieldingWaitStrategy先自旋100次,如果期间没有达成退出等待的条件,则主动让出cpu给其他线程作为惩罚。
SleepingWaitStrategy又是YieldingWaitStrategy的一种改进,SleepingWaitStrategy头100次先自旋,如果期间没有达成退出条件,则接下来100次主动让出cpu作为惩罚,如果还没有达成条件,则不再计数,每次睡1纳秒。
PhasedBackoffWaitStrategy相对复杂点,基本上是10000次自旋以后要么出让cpu,然后继续自旋,要么就采取新的等待策略。
EventProcessor是整个消费者事件处理框架,其主体就是线程的run方法,来看BatchEventProcessor,总体比较简单。
public void run() { if (!running.compareAndSet(false, true)) { throw new IllegalStateException("Thread is already running"); } sequenceBarrier.clearAlert(); notifyStart(); T event = null; long nextSequence = sequence.get() + 1L; try { while (true) { try { //等待至少一个可用的sequence出来 final long availableSequence = sequenceBarrier.waitFor(nextSequence); if (batchStartAware != null) { batchStartAware.onBatchStart(availableSequence - nextSequence + 1); } //一个一个消费事件 while (nextSequence <= availableSequence) { //从ringBuffer里获取下一个事件 event = dataProvider.get(nextSequence); //消费这个事件 eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } //当前的sequence推进到availableSequence sequence.set(availableSequence); } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) { if (!running.get()) { break; } } catch (final Throwable ex) { exceptionHandler.handleEventException(ex, nextSequence, event); sequence.set(nextSequence); nextSequence++; } } } finally { notifyShutdown(); running.set(false); } }
RingBuffer这边代码比较简单,主要就是封装了一下发布的api
abstract class RingBufferFields<E> extends RingBufferPad { private static final int BUFFER_PAD; private static final long REF_ARRAY_BASE; private static final int REF_ELEMENT_SHIFT; private static final Unsafe UNSAFE = Util.getUnsafe(); static { final int scale = UNSAFE.arrayIndexScale(Object[].class); if (4 == scale) { REF_ELEMENT_SHIFT = 2; } else if (8 == scale) { REF_ELEMENT_SHIFT = 3; } else { throw new IllegalStateException("Unknown pointer size"); } // 如果scale是4, BUFFER_PAD则为32 BUFFER_PAD = 128 / scale; // Including the buffer pad in the array base offset BUFFER_PAD<<REF_ELEMENT_SHIFT 实际上就是BUFFER_PAD * scale,最终算出来REF_ARRAY_BASE就是基地址 REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT); } private final long indexMask; private final Object[] entries; protected final int bufferSize; protected final Sequencer sequencer; RingBufferFields( EventFactory<E> eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; //bufferSize再加两倍的BUFFER_PAD大小,BUFFER_PAD分别在头尾 this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; fill(eventFactory); } private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { //初始化整个buffer entries[BUFFER_PAD + i] = eventFactory.newInstance(); } } @SuppressWarnings("unchecked") protected final E elementAt(long sequence) { //sequence & indexMask即对sequence取模, 最终算出来的就是基地址+偏移地址 return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); } }
主体代码基本如上。其他代码可以自行参考。
下面介绍下一些常见问题。
disruptor原本就是事件驱动的设计,其整个架构跟普通的多线程很不一样。比如一种用法,将disruptor作为业务处理,中间带I/O处理,这种玩法比多线程还慢;相反,如果将disruptor做业务处理,需要I/O时采用nio异步调用,不阻塞disruptor消费者线程,等到I/O异步调用回来后在回调方法中将后续处理重新塞到disruptor队列中,可以看出来,这是典型的事件处理架构,确实能在时间上占据优势,加上ringBuffer固有的几项性能优化,能让disruptor发挥最大功效。
这个问题参考之前的一篇文章 disruptor框架为什么这么强大
多生产者的消息写入实际上是通过availableBuffer与消费者来同步最后一个生产者写入的位置,这样,消费者永远不能超越最慢的那个生产者。见如下代码段
private void setAvailableBufferValue(int index, int flag) { long bufferAddress = (index * SCALE) + BASE; UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag); } @Override public boolean isAvailable(long sequence) { int index = calculateIndex(sequence); int flag = calculateAvailabilityFlag(sequence); long bufferAddress = (index * SCALE) + BASE; return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag; } @Override public long getHighestPublishedSequence(long lowerBound, long availableSequence) { for (long sequence = lowerBound; sequence <= availableSequence; sequence++) { if (!isAvailable(sequence)) { return sequence - 1; } } return availableSequence; }
可以参考这篇文章 RingBuffer多生产者写入
4. 除了多个消费者重复处理生产者发送的消息,是否可以多消费者不重复处理生产者发送的消息,即各处理各的?
若要多消费者重复处理生产者的消息,则使用disruptor.handleEventsWith方法将消费者传入;而若要消费者不重复的处理生产者的消息,则使用disruptor.handleEventsWithWorkerPool方法将消费者传入。