The LMAX Disruptor is a high performance inter-thread messaging library. It grew out of LMAX's research into concurrency, performance and non-blocking algorithms and today forms a core part of their Exchange's infrastructure. (LMAX Disruptor是一个高性能的线程间消息传递库。它源于LMAX对并发性、性能和非阻塞算法的研究,如今已成为其Exchange基础架构的核心部分。) -- 引用自GITHUB介绍
Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现。以下是介绍wiki地址:
https://github.com/LMAX-Exchange/disruptor/wiki 复制代码
Disruptor通过以下设计来解决队列速度慢的问题: 环形数组结构 为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。 元素位置定位 数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。 无锁设计 每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。 下面忽略数组的环形结构,介绍一下如何实现无锁设计。整个过程通过原子变量CAS,保证操作的线程安全。
另一个关键的实现低延迟的细节就是在Disruptor中利用无锁的算法,所有内存的可见性和正确性都是利用内存屏障或者CAS操作。使用CAS来保证多线程安全,与大部分并发队列使用的锁相比,CAS显然要快很多。CAS是CPU级别的指令,更加轻量,不必像锁一样需要操作系统提供支持,所以每次调用不需要在用户态与内核态之间切换,也不需要上下文切换。 只有一个用例中锁是必须的,那就是BlockingWaitStrategy(阻塞等待策略),唯一的实现方法就是使用Condition实现消费者在新事件到来前等待。许多低延迟系统使用忙等待去避免Condition的抖动,然而在系统忙等待的操作中,性能可能会显著降低,尤其是在CPU资源严重受限的情况下,例如虚拟环境下的WEB服务器。
这里我们按照原作者Demo介绍制作一个放入LongValue的生产者和消费者模型,相关的代码如下所示:
<dependencies> <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.2.1</version> </dependency> </dependencies> 复制代码
//定义事件event 通过Disruptor 进行交换的数据类型。 public class LongEvent { private Long value; public Long getValue() { return value; } public void setValue(Long value) { this.value = value; } } 复制代码
public class LongEventFactory implements EventFactory<LongEvent> { public LongEvent newInstance() { return new LongEvent(); } } 复制代码
// 消费者获得数据 public class LongEventHandler implements EventHandler<LongEvent> { @Override public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception { System.out.println("消费者获得数据:" + longEvent.getValue()); } } 复制代码
// 生产者 public class LongEventProducer { private RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer byteBuffer) { // 获取事件队列的下表位置 long sequence = ringBuffer.next(); try { // 取出空队列 LongEvent longEvent = ringBuffer.get(sequence); // 给空队列赋值 longEvent.setValue(byteBuffer.getLong(0)); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("生产者发送数据...."); ringBuffer.publish(sequence); } } } 复制代码
public class MainTest { public static void main(String[] args) { // 1. 创建线程池 ExecutorService executor = Executors.newCachedThreadPool(); // 2. 创建工厂 LongEventFactory longEventFactory = new LongEventFactory(); // 3.创建ringbuffer 大小 int ringbuffer = 1024 * 1024; // 2的N次方 // 4. 创建disruptor Disruptor<LongEvent> longEventDisruptor = new Disruptor<>( longEventFactory, ringbuffer, executor, ProducerType.MULTI, new YieldingWaitStrategy() ); // 5. 连接消费者 longEventDisruptor.handleEventsWith(new LongEventHandler()); // 6. 启动 longEventDisruptor.start(); // 7.创建ringbuffer容器 RingBuffer<LongEvent> ringBuffer = longEventDisruptor.getRingBuffer(); // 8.创建生产者 LongEventProducer longEventProducer = new LongEventProducer(ringBuffer); // 9. 指定缓冲区的大小 ByteBuffer byteBuffer = ByteBuffer.allocate(8); for (int i = 0; i < 10; i++) { byteBuffer.putLong(0,i); longEventProducer.onData(byteBuffer); } executor.shutdown(); longEventDisruptor.shutdown(); } } 复制代码
生产者发送数据.... 生产者发送数据.... 生产者发送数据.... 生产者发送数据.... 消费者获得数据:0 生产者发送数据.... 消费者获得数据:1 生产者发送数据.... 消费者获得数据:2 生产者发送数据.... 消费者获得数据:3 生产者发送数据.... 消费者获得数据:4 生产者发送数据.... 消费者获得数据:5 生产者发送数据.... 消费者获得数据:6 消费者获得数据:7 消费者获得数据:8 消费者获得数据:9 复制代码