public class DisruptorExample { public static void main(String[] args) throws InterruptedException { // RingBuffer大小,必须是2的N次方 int bufferSize = 1024; // 构建Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE); // 注册事件处理器 disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("E: " + event)); // 启动Disruptor disruptor.start(); RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); // 生产Event ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); // 生产者生产消息 ringBuffer.publishEvent((event, sequence, buffer) -> event.setValue(buffer.getLong(0)), bb); TimeUnit.SECONDS.sleep(1); } } } @Data class LongEvent { private long value; }
// com.lmax.disruptor.RingBufferFields for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); }
/** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count;
// 前:填充56字节 class LhsPadding { protected long p1, p2, p3, p4, p5, p6, p7; } class Value extends LhsPadding { protected volatile long value; } // 后:填充56字节 class RhsPadding extends Value { protected long p9, p10, p11, p12, p13, p14, p15; } public class Sequence extends RhsPadding { }
// com.lmax.disruptor.MultiProducerSequencer public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long current; long next; // 生产者获取n个写入位置 do { // current相当于入队索引,表示上次生产到这里 current = cursor.get(); // 目标是再生产n个 next = current + n; // 减掉一个循环 long wrapPoint = next - bufferSize; // 获取上一次的最小消费位置 long cachedGatingSequence = gatingSequenceCache.get(); // 没有足够的空余位置 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { // 重新计算所有消费者里面的最小值位置 long gatingSequence = Util.getMinimumSequence(gatingSequences, current); // 仍然没有足够的空余位置,出让CPU使用权,重新执行下一循环 if (wrapPoint > gatingSequence) { LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? continue; } // 重新设置上一次的最小消费位置 gatingSequenceCache.set(gatingSequence); } else if (cursor.compareAndSet(current, next)) { // 获取写入位置成功,跳出循环 break; } } while (true); return next; }
转载请注明出处:http://zhongmingmao.me/2019/05/31/java-concurrent-disruptor/
访问原文「 Java并发 -- Disruptor 」获取最佳阅读体验并参与讨论