转载

Disruptor源码系列-Sequencer

上篇文章已经讲过了 RingBuffer 了, RingBuffer 是消息的容器,但是 Disruptor 中最复杂的部分在于如何并发控制消息的增加和消费,而这部分由 Senquencer 来完成。

这篇文章基于 Disruptor 官方提供的示例代码。

Sequencer 简介

Sequencer 可以被认为是 Disruptor 的大脑,而 Sequence 则可以认为是神经元,Sequencer 会产生信号(Sequence 中的 value)来控制消费者和生产者。在一个 Disruptor 实例中,只会有一个 Sequencer 实例,在创建 RingBuffer 时创建。

// 多个生产者的情况
public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy)
{
    MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);
    return new RingBuffer<E>(factory, sequencer);
}
// 单个生产者的情况
public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy)
{
    SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
    return new RingBuffer<E>(factory, sequencer);
}
复制代码

Sequencer 接口有两种实现, SingleProducerSequencerMultiProducerSequencer ,分别来处理单个生产者和多个生产者的情况。

在 Sequencer 中有一个 next() 方法,就是这个方法来产生 Sequence 中的 value。Sequence 本质上可以认为是一个 AtomicLong,消费者和生产者都会维护自己的 Sequence。

Sequence 中的 value 表示 RingBuffer 消息的编号,Disruptor 中控制逻辑都是围绕这个编号来完成的。RingBuffer 的 sequence 从 0 开始增长。这里需要注意的是在 Disruptor 中共享的并不是 Sequence 对象,而是 sequence 中的 value。

生产者中 Sequence 的 value 表示当前消息已经生产到哪个位置, 消费者 中 Sequence 的 value 表示消费者已经处理到哪个位置。对于 Sequencer 和 Sequence 已经介绍清楚了,那么 Sequencer 是怎么运行的呢?

RingBuffer 是消息的容器,为了让消息能够被正常传递,RingBuffer 需要满足两个要求,第一个是对于所有的消费者,在 RingBuffer 为空时,就不能再从中取数据,对于生产者,新生产的内容不能把未消费的数据覆盖掉。

Sequencer 的核心就是解决了这两个问题,通过 GatingBarrier 两个工具。

Gating 通过 RingBuffer.addGatingSequences() 方法来获取,Barrier 通过 RingBuffer.newBarrier() 方法来获取。

Disruptor源码系列-Sequencer

上图中 C 代表消费者,P 代表生产者。

需要说明的是,EventProcessor + EventHandler 才是一个完整的消费者。EventProcessor 中会维护一个 Sequence 对象,记录该消费者处理到哪条消息,每个消费者维护自己的 Sequence 生产者的 Sequence 在 RingBuffer 维护

Gating 实现

Gating 的设计其实很简单,其实就是将多个所有消费者的 Sequence 监控起来,然后在生产者向 RingBuffer 中写入数据时,判断是否有足够的空间来存入新的消息。

所有消费者的 Sequence 通过如下的方法调用路径,最后存入到 Sequencer.gatingSequences 变量中。

Disruptor.handleEventsWith() -> RingBuffer.addGatingSequences() -> Sequencer.addGatingSequences()

Sequencer.next() 中会对 gatingSequences 进行判断,具体判断的逻辑就是看当前这些被监控的 Sequence 中最小的 value 是否已经落后一圈了,落后一圈就表示新的消息没有写入的空间:

// MultiProducerSequencer.next() 方法
do
{
    current = cursor.get();
    next = current + n;

    long wrapPoint = next - bufferSize; // 获取一圈之前的值
    long cachedGatingSequence = gatingSequenceCache.get(); // 获取缓存的 gatingSequences 的最小值
    // 如果大于缓存的值,则进行进一步的判断
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
    {
        // 获取当前实际最小的 sequence
        long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
        // 如果比实际的最小 sequqnce 还大,说明已经没有位置了,则继续进行自旋(无限循环)
        if (wrapPoint > gatingSequence)
        {
            LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
            continue;
        }
        gatingSequenceCache.set(gatingSequence);
    }
    else if (cursor.compareAndSet(current, next)) // 如果比缓存的值小,说明目前还有空闲位置,可以继续存入消息
    {
        break;
    }
}
while (true); // 这是一个无限循环,直到有新的空间可以存入消息
复制代码

如果没有足够的空间,那么 next() 方法就会被阻塞,新的消息无法加入到 RingBuffer 中。

Disruptor源码系列-Sequencer

上面是 gating 的示意图,c1 和 c2 处理的速度不一样,c1 在 1 的位置上,而 c2 在 2 的位置上,生产者 P 已经无法在向 RingBuffer 中添加新的消息,因此会被阻塞,直到 c1 将 消息处理完成之后才能继续插入消息。

SequencerBarrier 实现

同时对于消费者来说,必须等到 RingBuffer 中有消息才能进行处理。 通过 SequenceBarrier 来进行管理, SequenceBarrier 实际生成的是 ProcessingSequenceBarrier 实例,按照如下的调用路径来初始化:

RingBuffer.newBarrier() -> Sequencer.newBarrier() -> new ProcessingSequenceBarriser()

消费者从 RingBuffer 中获取消息时,需要通过 SenquencerBarrier 来确定是否有可用的消息, 使用 SequencerBarrier 的调用路径如下:

BatchEventProcessor.processEvents() -> sequenceBarrier.waitFor()

BatchEventProcessor 是默认使用的消费者,上面我们说到了 EventProcessor + EventHandler 才是一个完整的消费者。用户自己实现 EventHandler 来处理消息的逻辑。而实际从 RingBuffer 中获取消息的逻辑则在 BatchEventProcessor 中实现,关键代码如下:

// BatchEventProcessor.processEvents() 方法,删除了部分代码
while (true)
{
    try
    {
        // 获取可用消息的最大值
        final long availableSequence = sequenceBarrier.waitFor(nextSequence);
        // 如果当前的位置小于可用的位置,说明有消息可以处理,进行消息处理
        while (nextSequence <= availableSequence)
        {
            event = dataProvider.get(nextSequence);
            eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); // 调用实际的 Handler 处理消息
            nextSequence++;
        }
        sequence.set(availableSequence); // 将自己的 sequence 设置处理完成的位置
    }
}
复制代码

如果没有获取到可处理的 sequence, 那么当前的处理消息的 handlers 也会被阻塞。

Disruptor源码系列-Sequencer

SequenceBarrier 除了可以控制消费者从 RingBuffer 取数据之外,还可以控制多个消费者执行的顺序。如果要安排消费者执行的顺序,用如下的代码就可以。

disruptor.handleEventsWith(new LongEventHandler()).then(new AnOtherLongEventHandler());
复制代码

上面的代码表示 AnotherLongEventHandler 需要等 LongEventHandler 处理完成之后,才能对消息进行处理。

消费者之间控制依赖关系其实就是控制 sequence 的大小,如果说 C2 消费者 依赖 C1,那就表示 C2 中 Sequence 的值一定小于等于 C1 的 Sequence。

其中 then 关系是通过 Disruptor.updateGatingSequencesForNextInChain() 方法来实现:

private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
{
    if (processorSequences.length > 0)
    {
        ringBuffer.addGatingSequences(processorSequences);
        for (final Sequence barrierSequence : barrierSequences)
        {
            ringBuffer.removeGatingSequence(barrierSequence);
        }
        consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
    }
}
复制代码

其实 Disruptor 控制的秘密就是这些了,其实也不是很复杂,只是实现的方式很巧妙,再加上并发控制没有使用锁,才造就了一个如此高效的框架。

关注微信公众号,聊点其他的

Disruptor源码系列-Sequencer
原文  https://juejin.im/post/5eb81c666fb9a0435f09438a
正文到此结束
Loading...