转载

disruptor 源码解读

disruptor经过几年的发展,似乎已经成为性能优化的大杀器,几乎每个想优化性能的项目宣称自己用上了disruptor,性能都会呈现质的跃进。毕竟,最好的例子就是LMAX自己的架构设计,支撑了600w/s的吞吐。

本文试图从代码层面将关键问题做些解答。

基本概念

Disruptor: 实际上就是整个基于ringBuffer实现的生产者消费者模式的容器。

RingBuffer: 著名的环形队列,可以类比为BlockingQueue之类的队列,ringBuffer的使用,使得内存被循环使用,减少了某些场景的内存分配回收扩容等耗时操作。

disruptor 源码解读

EventProcessor: 事件处理器,实际上可以理解为消费者模型的框架,实现了线程Runnable的run方法,将循环判断等操作封在了里面。

disruptor 源码解读

EventHandler: 事件处置器,与前面处理器的不同是,事件处置器不负责框架内的行为,仅仅是EventProcessor作为消费者框架对外预留的扩展点罢了。

Sequencer: 作为RingBuffer生产者的父接口,其直接实现有SingleProducerSequencer和MultiProducerSequencer。

disruptor 源码解读

EventTranslator: 事件转换器。实际上就是新事件向旧事件覆盖的接口定义。

SequenceBarrier: 消费者路障。规定了消费者如何向下走。都说disruptor无锁,事实上,该路障算是变向的锁。

WaitStrategy: 当生产者生产得太快而消费者消费得太慢时的等待策略。

disruptor 源码解读

把上面几个关键概念画个图,大概长这样:

disruptor 源码解读

所以接下来主要也就从生产者,消费者以及ringBuffer3个维度去看disruptor是如何玩的。

生产者

生产者发布消息的过程从disruptor的publish方法为入口,实际调用了ringBuffer的publish方法。publish方法主要做了几件事,一是先确保能拿到后面的n个sequence;二是使用translator来填充新数据到相应的位置;三是真正的声明这些位置已经发布完成。

public void publishEvent(EventTranslator<E> translator)  
   {  
     final long sequence = sequencer.next();  
     translateAndPublish(translator, sequence);  
   }  
    public void publishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize)  
    {  
        checkBounds(translators, batchStartsAt, batchSize);  
        final long finalSequence = sequencer.next(batchSize);  
        translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence);  
    }

获取生产者下一个sequence的方法,细节已经注释,实际上最终目的就是确保生产者和消费者互相不越界。

public long next(int n)  
{  
    if (n < 1)  
    {  
        throw new IllegalArgumentException("n must be > 0");  
    }  
    //该生产者发布的最大序列号  
    long nextValue = this.nextValue;  
    //该生产者欲发布的序列号  
    long nextSequence = nextValue + n;  
    //覆盖点,即该生产者如果发布了这次的序列号,那它最终会落在哪个位置,实际上是nextSequence做了算术处理以后的值,最终目的是统一计算,否则就要去判绝对值以及取模等麻烦操作  
    long wrapPoint = nextSequence - bufferSize;  
    //所有消费者中消费得最慢那个的前一个序列号  
    long cachedGatingSequence = this.cachedValue;  
  
    //这里两个判断条件:一是看生产者生产是不是超过了消费者,所以判断的是覆盖点是否超过了最慢消费者;二是看消费者是否超过了当前生产者的最大序号,判断的是消费者是不是比生产者还快这种异常情况  
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)  
    {  
        cursor.setVolatile(nextValue);  // StoreLoad fence  
  
        long minSequence;  
        //覆盖点是不是已经超过了最慢消费者和当前生产者序列号的最小者(这两个有点难理解,实际上就是覆盖点不能超过最慢那个生产者,也不能超过当前自身,比如一次发布超过bufferSize),gatingSequences的处理也是类似算术处理,也可以看成是相对于原点是正还是负  
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))  
        {  
            //唤醒阻塞的消费者  
            waitStrategy.signalAllWhenBlocking();  
            //等上1纳秒  
            LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?  
        }  
        //把这个最慢消费者缓存下来,以便下一次使用  
        this.cachedValue = minSequence;  
    }  
    //把当前序列号更新为欲发布序列号  
    this.nextValue = nextSequence;  
  
    return nextSequence;  
}

translator由用户在调用时自己实现,其实就是预留的一个扩展点,将覆盖事件预留出来。大部分实现都是将ByteBuffer复制到Event中,参考disruptor github官方例子。

最后声明新序列号发布完成,实际上就是设置了cursor,并且通知可能阻塞的消费者,这里已经发布完新的Event了,快来消费吧。

public void publish(long sequence)  
{  
    cursor.set(sequence);  
    waitStrategy.signalAllWhenBlocking();  
}

以上就是单生产者的分析,MultiProducerSequencer可以类似分析。

等待策略

等待策略实际上就是用来同步生产者和消费者的方法。SequenceBarrier只有一个实现ProcessingSequenceBarrier,中间就用到了WaitStrategy

BlockingWaitStrategy就是真正的加锁阻塞策略,采用的就是ReentrantLock以及Condition来控制阻塞与唤醒。

TimeoutBlockingWaitStrategy是BlockingWaitStrategy中条件带超时的版本。

LiteBlockingWaitStrategy是BlockingWaitStrategy的改进版,走了ReentrantLock和CAS轻量级锁结合的方式,不过注释说这算是实验性质的微性能改进。

BusySpinWaitStrategy算是一个自旋锁,其实现很有趣,即不停的调用Thread类的onSpinWait方法。

YieldingWaitStrategy是自旋锁的一种改进,自旋锁对于cpu来说太重,于是YieldingWaitStrategy先自旋100次,如果期间没有达成退出等待的条件,则主动让出cpu给其他线程作为惩罚。

SleepingWaitStrategy又是YieldingWaitStrategy的一种改进,SleepingWaitStrategy头100次先自旋,如果期间没有达成退出条件,则接下来100次主动让出cpu作为惩罚,如果还没有达成条件,则不再计数,每次睡1纳秒。

PhasedBackoffWaitStrategy相对复杂点,基本上是10000次自旋以后要么出让cpu,然后继续自旋,要么就采取新的等待策略。

消费者

EventProcessor是整个消费者事件处理框架,其主体就是线程的run方法,来看BatchEventProcessor,总体比较简单。

public void run()  
{  
    if (!running.compareAndSet(false, true))  
    {  
        throw new IllegalStateException("Thread is already running");  
    }  
    sequenceBarrier.clearAlert();  
  
    notifyStart();  
  
    T event = null;  
    long nextSequence = sequence.get() + 1L;  
    try  
    {  
        while (true)  
        {  
            try  
            {  
                //等待至少一个可用的sequence出来  
               final long availableSequence = sequenceBarrier.waitFor(nextSequence);  
                if (batchStartAware != null)  
                {  
                    batchStartAware.onBatchStart(availableSequence - nextSequence + 1);  
                }  
                //一个一个消费事件  
                while (nextSequence <= availableSequence)  
                {  
                    //从ringBuffer里获取下一个事件  
                    event = dataProvider.get(nextSequence);  
                    //消费这个事件  
                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);  
                    nextSequence++;  
                }  
                //当前的sequence推进到availableSequence  
                sequence.set(availableSequence);  
            }  
            catch (final TimeoutException e)  
            {  
                notifyTimeout(sequence.get());  
            }  
            catch (final AlertException ex)  
            {  
                if (!running.get())  
                {  
                    break;  
                }  
            }  
            catch (final Throwable ex)  
            {  
                exceptionHandler.handleEventException(ex, nextSequence, event);  
                sequence.set(nextSequence);  
                nextSequence++;  
            }  
        }  
    }  
    finally  
    {  
        notifyShutdown();  
        running.set(false);  
    }  
}

RingBuffer

RingBuffer这边代码比较简单,主要就是封装了一下发布的api

abstract class RingBufferFields<E> extends RingBufferPad  
{  
    private static final int BUFFER_PAD;  
    private static final long REF_ARRAY_BASE;  
    private static final int REF_ELEMENT_SHIFT;  
    private static final Unsafe UNSAFE = Util.getUnsafe();  
  
    static  
    {  
        final int scale = UNSAFE.arrayIndexScale(Object[].class);  
        if (4 == scale)  
        {  
            REF_ELEMENT_SHIFT = 2;  
        }  
        else if (8 == scale)  
        {  
            REF_ELEMENT_SHIFT = 3;  
        }  
        else  
        {  
            throw new IllegalStateException("Unknown pointer size");  
        }  
        // 如果scale是4, BUFFER_PAD则为32  
       BUFFER_PAD = 128 / scale;  
        // Including the buffer pad in the array base offset BUFFER_PAD<<REF_ELEMENT_SHIFT 实际上就是BUFFER_PAD * scale,最终算出来REF_ARRAY_BASE就是基地址  
        REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);  
    }  
  
    private final long indexMask;  
    private final Object[] entries;  
    protected final int bufferSize;  
    protected final Sequencer sequencer;  
  
    RingBufferFields(  
        EventFactory<E> eventFactory,  
        Sequencer sequencer)  
    {  
        this.sequencer = sequencer;  
        this.bufferSize = sequencer.getBufferSize();  
  
        if (bufferSize < 1)  
        {  
            throw new IllegalArgumentException("bufferSize must not be less than 1");  
        }  
        if (Integer.bitCount(bufferSize) != 1)  
        {  
            throw new IllegalArgumentException("bufferSize must be a power of 2");  
        }  
  
        this.indexMask = bufferSize - 1;  
        //bufferSize再加两倍的BUFFER_PAD大小,BUFFER_PAD分别在头尾  
       this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];  
        fill(eventFactory);  
    }  
  
    private void fill(EventFactory<E> eventFactory)  
    {  
        for (int i = 0; i < bufferSize; i++)  
        {  
            //初始化整个buffer  
            entries[BUFFER_PAD + i] = eventFactory.newInstance();  
        }  
    }  
  
    @SuppressWarnings("unchecked")  
    protected final E elementAt(long sequence)  
    {  
        //sequence & indexMask即对sequence取模, 最终算出来的就是基地址+偏移地址  
        return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));  
    }  
}

主体代码基本如上。其他代码可以自行参考。

下面介绍下一些常见问题。

1. disruptor应该如何用才能发挥最大功效?

disruptor原本就是事件驱动的设计,其整个架构跟普通的多线程很不一样。比如一种用法,将disruptor作为业务处理,中间带I/O处理,这种玩法比多线程还慢;相反,如果将disruptor做业务处理,需要I/O时采用nio异步调用,不阻塞disruptor消费者线程,等到I/O异步调用回来后在回调方法中将后续处理重新塞到disruptor队列中,可以看出来,这是典型的事件处理架构,确实能在时间上占据优势,加上ringBuffer固有的几项性能优化,能让disruptor发挥最大功效。

2. disruptor为啥这么快?

这个问题参考之前的一篇文章  disruptor框架为什么这么强大

3. 多生产者如何写入消息?

多生产者的消息写入实际上是通过availableBuffer与消费者来同步最后一个生产者写入的位置,这样,消费者永远不能超越最慢的那个生产者。见如下代码段

private void setAvailableBufferValue(int index, int flag)  
{  
    long bufferAddress = (index * SCALE) + BASE;  
    UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);  
}  
  
@Override  
public boolean isAvailable(long sequence)  
{  
    int index = calculateIndex(sequence);  
    int flag = calculateAvailabilityFlag(sequence);  
    long bufferAddress = (index * SCALE) + BASE;  
    return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;  
}  
  
@Override  
public long getHighestPublishedSequence(long lowerBound, long availableSequence)  
{  
    for (long sequence = lowerBound; sequence <= availableSequence; sequence++)  
    {  
        if (!isAvailable(sequence))  
        {  
            return sequence - 1;  
        }  
    }  
  
    return availableSequence;  
}

可以参考这篇文章 RingBuffer多生产者写入

4. 除了多个消费者重复处理生产者发送的消息,是否可以多消费者不重复处理生产者发送的消息,即各处理各的?

若要多消费者重复处理生产者的消息,则使用disruptor.handleEventsWith方法将消费者传入;而若要消费者不重复的处理生产者的消息,则使用disruptor.handleEventsWithWorkerPool方法将消费者传入。

原文  http://www.importnew.com/28687.html
正文到此结束
Loading...