Disruptor 使用方法
这篇文章我犹豫了很久到底要不要单独写,因为只是一个第三方库的使用实例展示。但是Disruptor是Log4j2中异步Logger的核心数据结构,讲解其原理前有必要单独介绍一下Disruptor的简单使用方法。这篇文章用一个简单的Demo简介Disruptor的使用方法
Disruptor是LMAX的一个并发框架,LMAX是一个新型的零售交易平台,特点是低延迟&高吞吐,这个框架构建在JVM之上,整个业务逻辑的处理器完全在内存之上运行,业务逻辑处理器的核心是Disruptors,这是一个并发组件,能够在无锁的情况下实现网络的Queue并发操作。我们可以很简单的在该框架上实现生产者-消费者模型。
Disruptor使用方法如下:
下面我以一个Demo详细说明Disruptor的使用方法,Demo的功能非常简单,是一个典型的生产者-消费者模型,生产者负责生产String类型的消息,消费者消费消费数据并在Console输出,下面分步说明
public class StringEvent { private String value; public String getValue() { return value; } public void setValue(String value) { this.value = value; } }
public class StringEventFactory implements EventFactory { @Override public Object newInstance() { return new StringEvent(); } }
public class StringEventHandler implements EventHandler<StringEvent> { @Override public void onEvent(StringEvent longEvent, long l, boolean b) { System.out.println(longEvent.getValue()); } }
public class StringEventProducer { private final RingBuffer<StringEvent> ringBuffer; public StringEventProducer(RingBuffer<StringEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(String value) { long seq = ringBuffer.next();//获取下一个空闲事件槽 try { StringEvent event = ringBuffer.get(seq); event.setValue(value); }finally { ringBuffer.publish(seq); //向对应的事件槽发布事件 } } }
public class DisruptorTest { public static void main(String[] args) throws InterruptedException { Executor executor = Executors.newSingleThreadExecutor();//定义线程池 StringEventFactory factory = new StringEventFactory(); int bufferSize = 1024; Disruptor<StringEvent> disruptor = new Disruptor<StringEvent>(factory, bufferSize, executor);//实例化Disruptor disruptor.handleEventsWith(new StringEventHandler());//绑定事件处理函数 disruptor.start();//启动Disruptor RingBuffer<StringEvent> ringBuffer = disruptor.getRingBuffer(); StringEventProducer producer = new StringEventProducer(ringBuffer); for (Long i = 0L; i < 100L; i++) { producer.onData("BryantChangXY" + i); Thread.sleep(100); } disruptor.shutdown(); ((ExecutorService) executor).shutdown(); } }
BryantChangXY92 BryantChangXY93 BryantChangXY94 BryantChangXY95 BryantChangXY96 BryantChangXY97 BryantChangXY98 BryantChangXY99
在这个Demo的实现过程中,最重要的步骤则是事件发布与事件处理,事件发布主要是通过调用ringBuffer.next()方法获取RingBuffer的下一个空闲事件槽,并调用publish方法将事件发布出去。publish方法如果调用异常,当多线程同时生产时会造成冲突,为此Disruptor框架给出了Translator的方式,这个方式保证每一次publish是事务的,代码流程如下:
public class StringEventProducerWithTranslator { private static final EventTranslatorOneArg<StringEvent, String> TRANSLATOR = new EventTranslatorOneArg<StringEvent, String>() { public void translateTo(StringEvent event, long sequence, String bb) { event.setValue(bb); } }; private RingBuffer<StringEvent> ringBuffer; public StringEventProducerWithTranslator(RingBuffer<StringEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(String data) { ringBuffer.publishEvent(TRANSLATOR, data); } }
至此,我们介绍了Disruptor的核心使用流程。下面的文章中我将详细介绍AsyncLogger的设计架构以及流程,同时解读异步Appender和异步Logger的区别
谢谢你请我吃糖果