上篇文章已经讲过了 RingBuffer 了, RingBuffer 是消息的容器,但是 Disruptor 中最复杂的部分在于如何并发控制消息的增加和消费,而这部分由 Senquencer 来完成。
这篇文章基于 Disruptor 官方提供的示例代码。
Sequencer 可以被认为是 Disruptor 的大脑,而 Sequence 则可以认为是神经元,Sequencer 会产生信号(Sequence 中的 value)来控制消费者和生产者。在一个 Disruptor 实例中,只会有一个 Sequencer 实例,在创建 RingBuffer 时创建。
// 多个生产者的情况 public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy); return new RingBuffer<E>(factory, sequencer); } // 单个生产者的情况 public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy); return new RingBuffer<E>(factory, sequencer); } 复制代码
Sequencer 接口有两种实现, SingleProducerSequencer
和 MultiProducerSequencer
,分别来处理单个生产者和多个生产者的情况。
在 Sequencer 中有一个 next()
方法,就是这个方法来产生 Sequence 中的 value。Sequence 本质上可以认为是一个 AtomicLong,消费者和生产者都会维护自己的 Sequence。
Sequence 中的 value 表示 RingBuffer 消息的编号,Disruptor 中控制逻辑都是围绕这个编号来完成的。RingBuffer 的 sequence 从 0 开始增长。这里需要注意的是在 Disruptor 中共享的并不是 Sequence 对象,而是 sequence 中的 value。
生产者中 Sequence 的 value 表示当前消息已经生产到哪个位置, 消费者 中 Sequence 的 value 表示消费者已经处理到哪个位置。对于 Sequencer 和 Sequence 已经介绍清楚了,那么 Sequencer 是怎么运行的呢?
RingBuffer 是消息的容器,为了让消息能够被正常传递,RingBuffer 需要满足两个要求,第一个是对于所有的消费者,在 RingBuffer 为空时,就不能再从中取数据,对于生产者,新生产的内容不能把未消费的数据覆盖掉。
Sequencer 的核心就是解决了这两个问题,通过 Gating
和 Barrier
两个工具。
Gating 通过 RingBuffer.addGatingSequences()
方法来获取,Barrier 通过 RingBuffer.newBarrier()
方法来获取。
上图中 C 代表消费者,P 代表生产者。
需要说明的是,EventProcessor + EventHandler 才是一个完整的消费者。EventProcessor 中会维护一个 Sequence 对象,记录该消费者处理到哪条消息,每个消费者维护自己的 Sequence 生产者的 Sequence 在 RingBuffer 维护
Gating 的设计其实很简单,其实就是将多个所有消费者的 Sequence 监控起来,然后在生产者向 RingBuffer 中写入数据时,判断是否有足够的空间来存入新的消息。
所有消费者的 Sequence 通过如下的方法调用路径,最后存入到 Sequencer.gatingSequences 变量中。
Disruptor.handleEventsWith() -> RingBuffer.addGatingSequences() -> Sequencer.addGatingSequences()
Sequencer.next() 中会对 gatingSequences 进行判断,具体判断的逻辑就是看当前这些被监控的 Sequence 中最小的 value 是否已经落后一圈了,落后一圈就表示新的消息没有写入的空间:
// MultiProducerSequencer.next() 方法 do { current = cursor.get(); next = current + n; long wrapPoint = next - bufferSize; // 获取一圈之前的值 long cachedGatingSequence = gatingSequenceCache.get(); // 获取缓存的 gatingSequences 的最小值 // 如果大于缓存的值,则进行进一步的判断 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { // 获取当前实际最小的 sequence long gatingSequence = Util.getMinimumSequence(gatingSequences, current); // 如果比实际的最小 sequqnce 还大,说明已经没有位置了,则继续进行自旋(无限循环) if (wrapPoint > gatingSequence) { LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? continue; } gatingSequenceCache.set(gatingSequence); } else if (cursor.compareAndSet(current, next)) // 如果比缓存的值小,说明目前还有空闲位置,可以继续存入消息 { break; } } while (true); // 这是一个无限循环,直到有新的空间可以存入消息 复制代码
如果没有足够的空间,那么 next() 方法就会被阻塞,新的消息无法加入到 RingBuffer 中。
上面是 gating 的示意图,c1 和 c2 处理的速度不一样,c1 在 1 的位置上,而 c2 在 2 的位置上,生产者 P 已经无法在向 RingBuffer 中添加新的消息,因此会被阻塞,直到 c1 将 消息处理完成之后才能继续插入消息。
同时对于消费者来说,必须等到 RingBuffer 中有消息才能进行处理。 通过 SequenceBarrier 来进行管理, SequenceBarrier 实际生成的是 ProcessingSequenceBarrier
实例,按照如下的调用路径来初始化:
RingBuffer.newBarrier() -> Sequencer.newBarrier() -> new ProcessingSequenceBarriser()
消费者从 RingBuffer 中获取消息时,需要通过 SenquencerBarrier 来确定是否有可用的消息, 使用 SequencerBarrier 的调用路径如下:
BatchEventProcessor.processEvents() -> sequenceBarrier.waitFor()
BatchEventProcessor 是默认使用的消费者,上面我们说到了 EventProcessor + EventHandler 才是一个完整的消费者。用户自己实现 EventHandler 来处理消息的逻辑。而实际从 RingBuffer 中获取消息的逻辑则在 BatchEventProcessor 中实现,关键代码如下:
// BatchEventProcessor.processEvents() 方法,删除了部分代码 while (true) { try { // 获取可用消息的最大值 final long availableSequence = sequenceBarrier.waitFor(nextSequence); // 如果当前的位置小于可用的位置,说明有消息可以处理,进行消息处理 while (nextSequence <= availableSequence) { event = dataProvider.get(nextSequence); eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); // 调用实际的 Handler 处理消息 nextSequence++; } sequence.set(availableSequence); // 将自己的 sequence 设置处理完成的位置 } } 复制代码
如果没有获取到可处理的 sequence, 那么当前的处理消息的 handlers 也会被阻塞。
SequenceBarrier 除了可以控制消费者从 RingBuffer 取数据之外,还可以控制多个消费者执行的顺序。如果要安排消费者执行的顺序,用如下的代码就可以。
disruptor.handleEventsWith(new LongEventHandler()).then(new AnOtherLongEventHandler()); 复制代码
上面的代码表示 AnotherLongEventHandler
需要等 LongEventHandler
处理完成之后,才能对消息进行处理。
消费者之间控制依赖关系其实就是控制 sequence 的大小,如果说 C2 消费者 依赖 C1,那就表示 C2 中 Sequence 的值一定小于等于 C1 的 Sequence。
其中 then 关系是通过 Disruptor.updateGatingSequencesForNextInChain() 方法来实现:
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) { if (processorSequences.length > 0) { ringBuffer.addGatingSequences(processorSequences); for (final Sequence barrierSequence : barrierSequences) { ringBuffer.removeGatingSequence(barrierSequence); } consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); } } 复制代码
其实 Disruptor 控制的秘密就是这些了,其实也不是很复杂,只是实现的方式很巧妙,再加上并发控制没有使用锁,才造就了一个如此高效的框架。
关注微信公众号,聊点其他的