转载

SOFATracer (二) : Disruptor 简单使用

A High Performance Inter-Thread Messaging Library 高性能的线程间消息传递库

关于 Disruptor 的 一些原理分析可以参考:disruptor

案例

先通过 Disruptor 的一个小例子来有个直观的认识;先看下它的构造函数:

public Disruptor(
        final EventFactory<T> eventFactory,
        final int ringBufferSize,
        final ThreadFactory threadFactory,
        final ProducerType producerType,
        final WaitStrategy waitStrategy)
{
    this(
        RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
        new BasicExecutor(threadFactory));
}
复制代码
  • eventFactory : 在环形缓冲区中创建事件的 factory
  • ringBufferSize:环形缓冲区的大小,必须是2的幂。
  • threadFactory:用于为处理器创建线程。
  • producerType:生成器类型以支持使用正确的 sequencerpublisher 创建 RingBuffer ;枚举类型, SINGLEMULTI 两个项。对应于 SingleProducerSequencerMultiProducerSequencer 两种 Sequencer
  • waitStrategy : 等待策略;

如果我们想构造一个 disruptor ,那么我们就需要上面的这些组件。从 eventFactory 来看,还需要一个具体的 Event 来作为消息事件的载体。【下面按照官方给的案例进行简单的修改作为示例】

消息事件 LongEvent ,能够被消费的数据载体

public class LongEvent {
    private long value;
    public void set(long value) {
        this.value = value;
    }
    public long getValue() {
        return value;
    }
}
复制代码

创建消息事件的factory

public class LongEventFactory implements EventFactory<LongEvent> {
    @Override
    public LongEvent newInstance() {
        return new LongEvent();
    }
}
复制代码

ConsumerThreadFactory

public class ConsumerThreadFactory implements ThreadFactory {
    private final AtomicInteger index = new AtomicInteger(1);
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "disruptor-thread-" + index.getAndIncrement());
    }
}
复制代码

OK ,上面的这些可以满足创建一个 disruptor 了:

private int ringBufferCapacity = 8;
//消息事件生产Factory
LongEventFactory longEventFactory = new LongEventFactory();
//执行事件处理器线程Factory
ConsumerThreadFactory consumerThreadFactory = new ConsumerThreadFactory();
//用于环形缓冲区的等待策略。
WaitStrategy waitStrategy = new BlockingWaitStrategy();

//构建disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(
    longEventFactory,
    ringBufferCapacity,
    longEventThreadFactory,
    ProducerType.SINGLE,
    waitStrategy);
复制代码

现在是已经有了 disruptor 了,然后通过: start 来启动:

//启动 disruptor
 disruptor.start();
复制代码

到这里,已经构建了一个 disruptor ;但是目前怎么使用它来发布消息和消费消息呢?

发布消息

下面在 for 循环中 发布 5 条数据:

RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
for (long l = 0; l < 5; l++)
{
    long sequence = ringBuffer.next();
    LongEvent event = ringBuffer.get(sequence);
    event.set(100+l);
    System.out.println("publish event :" + l);
    ringBuffer.publish(sequence);
    Thread.sleep(1000);
}
复制代码

消息已经发布,下面需要设定当前 disruptor 的消费处理器。前面已经有个 LongEventEventFactory ; 在 disruptor 中是通过 EventHandler 来进行消息消费的。

编写消费者代码

public class LongEventHandler implements EventHandler<LongEvent> {
    @Override
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println("Event: " + event.getValue()+" -> " + Thread.currentThread().getName());
        Thread.sleep(2000);
    }
}
复制代码

eventHandler 设置到 disruptor 的处理链上

//将处理事件的事件处理程序 -> 消费事件的处理程序
LongEventHandler longEventHandler = new LongEventHandler();
disruptor.handleEventsWith(longEventHandler);
复制代码

运行结果(这里):

publish event :0
Event: 0 -> disruptor-thread-1
-------------------------------->
publish event :1
Event: 1 -> disruptor-thread-1
-------------------------------->
publish event :2
Event: 2 -> disruptor-thread-1
-------------------------------->
publish event :3
Event: 3 -> disruptor-thread-1
-------------------------------->
publish event :4
Event: 4 -> disruptor-thread-1
-------------------------------->
复制代码

基本概念和原理

Disruptor

整个基于 ringBuffer 实现的生产者消费者模式的容器。主要属性

private final RingBuffer<T> ringBuffer;
private final Executor executor;
private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<>();
private final AtomicBoolean started = new AtomicBoolean(false);
private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<>();
复制代码
  • ringBuffer :内部持有一个 RingBuffer 对象, Disruptor 内部的事件发布都是依赖这个 RingBuffer 对象完成的。
  • executor :消费事件的线程池
  • consumerRepository :提供存储库机制,用于将 EventHandlerEventProcessor 关联起来
  • started : 用于标志当前 Disruptor 是否已经启动
  • exceptionHandler : 异常处理器,用于处理 BatchEventProcessor 事件周期中 uncaught exceptions

RingBuffer

环形队列[实现上是一个数组],可以类比为 BlockingQueue 之类的队列, ringBuffer 的使用,使得内存被循环使用,减少了某些场景的内存分配回收扩容等耗时操作。

public final class RingBuffer<E> extends RingBufferFields<E> 
implements Cursored, EventSequencer<E>, EventSink<E> 
复制代码
  • E:在事件的交换或并行协调期间存储用于共享的数据的实现 -> 消息事件

Sequencer

RingBuffer 中 生产者的顶级父接口,其直接实现有 SingleProducerSequencerMultiProducerSequencer ;对应 SINGLEMULTI 两个枚举值。

SOFATracer (二) : Disruptor 简单使用

EventHandler

事件处置器,改接口用于对外扩展来实现具体的消费逻辑。如上面 demo 中的 LongEventHandler ;

//回调接口,用于处理{@link RingBuffer}中可用的事件
public interface EventHandler<T> {
    void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
}
复制代码
  • event : RingBuffer 已经发布的事件
  • sequence : 正在处理的事件 的序列号
  • endOfBatch : 用来标识否是来自 RingBuffer 的批次中的最后一个事件

SequenceBarrier

消费者路障。规定了消费者如何向下走。事实上,该路障算是变向的锁。

final class ProcessingSequenceBarrier implements SequenceBarrier {
    //当等待(探测)的需要不可用时,等待的策略
    private final WaitStrategy waitStrategy;
    //依赖的其它Consumer的序号,这个用于依赖的消费的情况,
    //比如A、B两个消费者,只有A消费完,B才能消费。
    private final Sequence     dependentSequence;
    private volatile boolean   alerted = false;
    //Ringbuffer的写入指针
    private final Sequence     cursorSequence;
    //RingBuffer对应的Sequencer
    private final Sequencer    sequencer;
    //exclude method
}
复制代码

waitStrategy 决定了消费者采用何种等待策略。

WaitStrategy

Strategy employed for making {@link EventProcessor}s wait on a cursor {@link Sequence}.

EventProcessor 的等待策略;具体实现在 disruptor 中有8种,

SOFATracer (二) : Disruptor 简单使用

这些等待策略不同的核心体现是在如何实现 waitFor 这个方法上。

EventProcessor

事件处理器,实际上可以理解为消费者模型的框架,实现了线程 Runnablerun 方法,将循环判断等操作封在了里面。该接口有三个实现类:

1、BatchEventProcessor

public final class BatchEventProcessor<T> implements EventProcessor {
    private final AtomicBoolean           running          = new AtomicBoolean(false);
    private ExceptionHandler<? super T>   exceptionHandler = new FatalExceptionHandler();
    private final DataProvider<T>         dataProvider;
    private final SequenceBarrier         sequenceBarrier;
    private final EventHandler<? super T> eventHandler;
    private final Sequence                sequence         = new Sequence(                                      Sequencer.INITIAL_CURSOR_VALUE);
    private final TimeoutHandler          timeoutHandler;
    //exclude method
}
复制代码
  • ExceptionHandler:异常处理器
  • DataProvider:数据来源,对应 RingBuffer
  • EventHandler:处理 Event 的回调对象
  • SequenceBarrier:对应的序号屏障
  • TimeoutHandler:超时处理器,默认情况为空,如果要设置,只需要要将关联的 EventHandler 实现 TimeOutHandler 即可。

如果我们选择使用 EventHandler 的时候,默认使用的就是 BatchEventProcessor ,它与 EventHandler 是一一对应,并且是单线程执行。

如果某个 RingBuffer 有多个 BatchEventProcessor ,那么就会每个 BatchEventProcessor 对应一个线程。

2、WorkProcessor

public final class WorkProcessor<T> implements EventProcessor {
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    private final RingBuffer<T> ringBuffer;
    private final SequenceBarrier  sequenceBarrier;
    private final WorkHandler<? super T> workHandler;
    private final ExceptionHandler<? super T> exceptionHandler;
    private final Sequence workSequence;

    private final EventReleaser eventReleaser = new EventReleaser() {
            @Override
            public void release() {
                sequence.set(Long.MAX_VALUE);
            }
    };
    private final TimeoutHandler timeoutHandler;
}
复制代码

基本和 BatchEventProcessor 类似,不同在于,用于处理 Event 的回调对象是 WorkHandler

原理图

SOFATracer (二) : Disruptor 简单使用

无消费者情况下,生产者保持生产,但是 remainingCapacity 保持不变

在写 demo 的过程中,本来想通过不设定 消费者 来观察 RingBuffer 可用容量变化的。但是验证过程中,一直得不到预期的结果,(注:没有设置消费者,只有生产者),先看结果:

publish event :0
bufferSie:8
remainingCapacity:8
cursor:0
-------------------------------->
publish event :1
bufferSie:8
remainingCapacity:8
cursor:1
-------------------------------->
publish event :2
bufferSie:8
remainingCapacity:8
cursor:2
-------------------------------->
publish event :3
bufferSie:8
remainingCapacity:8
cursor:3
-------------------------------->
publish event :4
bufferSie:8
remainingCapacity:8
cursor:4
-------------------------------->
publish event :5
bufferSie:8
remainingCapacity:8
cursor:5
-------------------------------->
publish event :6
bufferSie:8
remainingCapacity:8
cursor:6
-------------------------------->
publish event :7
bufferSie:8
remainingCapacity:8
cursor:7
-------------------------------->
publish event :8
bufferSie:8
remainingCapacity:8
cursor:8
-------------------------------->
publish event :9
bufferSie:8
remainingCapacity:8
cursor:9
-------------------------------->
复制代码

从结果来看, remainingCapacity 的值应该随着 发布的数量 递减的;但是实际上它并没有发生任何变化。

来看下 ringBuffer.remainingCapacity() 这个方法:

/**
 * Get the remaining capacity for this ringBuffer.
 *
 * @return The number of slots remaining.
 */
public long remainingCapacity()
{
    return sequencer.remainingCapacity();
}
复制代码

这里面又使用 sequencer.remainingCapacity() 这个方法来计算的。上面的例子中使用的是 ProducerType.SINGLE ,那来看 SingleProducerSequencer 这个里面 remainingCapacity 的实现。

@Override
public long remainingCapacity()
{
    //上次申请完毕的序列值
    long nextValue = this.nextValue;
    //计算当前已经消费到的序列值
    long consumed = Util.getMinimumSequence(gatingSequences, nextValue);
    //当前生产到的序列值
    long produced = nextValue;
    return getBufferSize() - (produced - consumed);
}
复制代码

来解释下这段代码的含义:

假设当前 ringBufferbufferSize 是 8 ;上次申请到的序列号是 5,其实也就是说已经生产过占用的序列号是5;假设当前已经消费到的序列号是 3,那么剩余的容量为: 8-(5-2) = 5;

SOFATracer (二) : Disruptor 简单使用

因为这里我们可以确定 bufferSizeproduced 的值了,那么 remainingCapacity 的结果就取决于 getMinimumSequence 的计算结果了。

public static long getMinimumSequence(final Sequence[] sequences, long minimum)
{
    for (int i = 0, n = sequences.length; i < n; i++)
    {
        long value = sequences[i].get();
        minimum = Math.min(minimum, value);
    }
    return minimum;
}
复制代码

这个方法是从 Sequence 数组中获取最小序列 。如果 sequences 为空,则返回 minimum 。回到上一步,看下 sequences 这个数组是从哪里过来的,它的值在哪里设置的。

long consumed = Util.getMinimumSequence(gatingSequences, nextValue);
复制代码

gatingSequencesSingleProducerSequencer 父类 AbstractSequencer 中的成员变量:

protected volatile Sequence[] gatingSequences = new Sequence[0];
复制代码

gatingSequences 是在下面这个方法里面来管理的。

/**
 * @see Sequencer#addGatingSequences(Sequence...)
 */
@Override
public final void addGatingSequences(Sequence... gatingSequences)
{
    SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}
复制代码

这个方法的调用栈向前追溯有这几个地方调用了:

SOFATracer (二) : Disruptor 简单使用

WorkerPool 来管理多个消费者; hangdlerEventsWith 这个方法也是用来设置消费者的。但是在上面的测试案例中我们是想通过不设定消费者 只设定生成者 来观察 环形队列的占用情况,所以 gatingSequences 会一直是空的,因此在计算时会把 produced 的值作为 minimum 返回。这样每次计算就相当于:

return getBufferSize() - (produced - produced) === getBufferSize();
复制代码

也就验证了为何在不设定消费者的情况下, remainingCapacity 的值会一直保持不变。

原文  https://juejin.im/post/5b655e7ee51d4519226fa4bd
正文到此结束
Loading...