You should never do your asynchronous work alone. — Jon Brisbin 完成Reactor 1后写到 You should never do your asynchronous work alone. — Stephane Maldini 完成Reactor 2后写到
名称解释:back pressure:背压。在 交换机 在阻止外来 数据包 发送到堵塞端口的时候可能会发生丢包。而背压就是考验 交换机 在这个时候避免丢包的能力。很多的 交换机 当发送或接收 缓冲区溢出 的时候通过将阻塞信号发送回源地址来实现背压。 交换机 在全双工时使用IEEE802.3x 流控制 达到同样目的。
首先,我们使用groovy示例来展示core模块的功能:
//Initialize context and get default dispatcher Environment.initialize() //RingBufferDispatcher with 8192 slots by default def dispatcher = Environment.sharedDispatcher() //Create a callback Consumer<Integer> c = { data -> println "some data arrived: $data" } //Create an error callback Consumer<Throwable> errorHandler = { it.printStackTrace } //Dispatch data asynchronously r.dispatch(1234, c, errorHandler) Environment.terminate()
下面,我们使用Stream reactive实现来看:
//standalone async processor def processor = RingBufferProcessor.<Integer>create() //send data, will be kept safe until a subscriber attaches to the processor processor.onNext(1234) processor.onNext(5678) //consume integer data processor.subscribe(new Subscriber<Integer>(){ void onSubscribe(Subscription s){ //unbounded subscriber s.request Long.MAX } void onNext(Integer data){ println data } void onError(Throwable err){ err.printStackTrace() } void onComplete(){ println 'done!' } } //Shutdown internal thread and call complete processor.onComplete()
Core模块概览
Reactor core模块的子单元:
Common IO和功能类型,一些是直接从java8 功能接口回迁的。 Function、Supplier、consumer、Predicate、BiConsumer、BiFunction Tuples Resource、Pausable、Timer Buffer,Codec和一组预定义的Codec。 Environment 上下文 Dispatcher 协议和一组预定义的Dispatcher。 预定义的Reactive Stream Processor
reactor-core可以用来逐渐替代另外的消息传递策略、调度时间任务或者以小的功能块组织代码。这种突破使开发者与其它Reactive基础库更好的合作,特别是对于没有耐心的开发者,没有了对RingBuffer的理解负担。
注意:Reactor-core隐藏了LMAX disruptor,因此不会出现也不会和现有的Disruptor依赖冲突。
功能模块重用是核心,通常情况下在你使用Reactor时就需要的功能。因此,功能编程酷在哪里?其中一个核心理念是将可执行代码看做别的数据。另一点,类似于Closure或者匿名函数,此时业务逻辑由最初的调用者决定。它同样避免了过量的If/SWITCH模块,并且这种分离是概念更清晰:每个模块完成一个功能且不需要共享任何东西。
组织功能模块
每个功能组件都给出它的一般任务的明确意图:
Consumer:简单回调--一劳永逸的
BiCounsumer:两个参数的简单回调,通常用在序列比较,例如:前一个和下一个参数。
Function:转换逻辑--请求/应答
BiFunction:两个参数的转换,通常用在累加器,比较前一个和下一个参数,返回一个新的值。
Supplier:工厂逻辑--轮询
Predicate:测试路径--过滤
注意:我们也将Publisher和Subscriber视作功能块,敢于称之为Reactive功能块。尽管如此,它们作为基础组件,广泛应用到Reactor及其其它地方。Stream API接收reactor.fn参数,为你创建合适的Subscriber。
好消息是在功能模块中包装可执行指令可以向砖块一样进行复用。
Consumer<String> consumer = new Consumer<String>(){ @Override void accept(String value){ System.out.println(value); } }; //Now in Java 8 style for brievety Function<Integer, String> transformation = integer -> ""+integer; Supplier<Integer> supplier = () -> 123; BiConsumer<Consumer<String>, String> biConsumer = (callback, value) -> { for(int i = 0; i < 10; i++){ //lazy evaluate the final logic to run callback.accept(value); } }; //note how the execution flows from supplier to biconsumer biConsumer.accept( consumer, transformation.apply( supplier.get() ) );
最初听起来,这可能不是一个引人注目的革命性变革。但是这种基本思维模式的改变,将揭示我们使异步代码变的稳健和可组合性的使命是多么可贵。Dispatcher分发器将输入数据和错误回调分发给consumer来处理。Reactor Stream模块将更好的使用这些组件。
当使用Ioc容器如spring时,一个好的开发者将利用Java的配置属性来返回一个无状态的功能bean。然后可以优美的注入到stream Pipeline或者分发他们的执行代码中的block中。
你可以注意到这些接口,它们对输入参数和比较少的固定数量的参数的泛型有很好的支持。你怎么传递超过1个或者超过2个的参数呢?答案是使用元组Tuple,Tuple类似于csv中一个单独实例的一样,可以在在功能性编程中保证它们的类型安全和支持多个数量的参数。
以前面的例子为例,我们尝试提供两个参数的BiConsumer而使用单个参数的Consumer
Consumer<Tuple2<Consumer<String>, String>> biConsumer = tuple -> { for(int i = 0; i < 10; i++){ //Correct typing, compiler happy tuple.getT1().accept(tuple.getT2()); } }; biConsumer.accept( Tuple.of( consumer, transformation.apply(supplier.get()) ) );
注意:Tuple需要分配更多的空间,因此在比较或者键值信号等一般使用场景中更多直接使用Bi***组件。
功能性构建块已经准备就绪,让我们使用它们来进行异步编程。第一步是到Dispatcher分区。
在我们启动任意Dispatcher前,需要保证可以有效的创建它们。通常,创建它们的代价比较高,原因是需要预分配一个内存分区来保持分配的信号,这就是前言中介绍的著名的运行时分配和启动时预分配的不同对比。因此提出了一个名为"Environment"共享上下文概念,使用它来管理这些不同类型的Dispatcher,从而避免不必要的创建开销。
Environment
reactor的使用者(或者可用的扩展库如@Spring)创建或者停止Environment。它们自动从 META_INF/reactor/reactor-environment.properties 处读取配置文件。
注意,属性文件可以改变,通过在classpath下的META-INFO/reactor目录下一个新的属性配置可以改变属性文件。
通过传递下面的环境变量reactor.profiles.active来在运行时段改变默认的配置文件。
java - jar reactor-app.jar -Dreactor.profiles.active=turbo
启动和停止Environment
Environment env = Environment.initialize(); //Current registered environment is the same than the one initialized Assert.isTrue(Environment.get() == env); //Find a dispatcher named "shared" Dispatcher d = Environment.dispatcher("shared"); //get the Timer bound to this environment Timer timer = Environment.timer(); //Shutdown registered Dispatchers and Timers that might run non-daemon threads Environment.terminate(); //An option could be to register a shutdownHook to automatically invoke terminate.
注意:在一个给定的Jvm应用中,最好只维护一个Enviroment.在大多数情况下,使用Environment.initializeIfEmpty()就完全ok。
Dispacher分发器
从Reactor 1开始,Dispatcher就存在了。Dispatcher通常抽象消息传递的方法,和Java Executor有类似的通用约定。事实上Dispatcher继承自Executor。
Dispatcher对有数据信号的传送方式及消费者同步或异步执行的错误信息有一套比较严格的类型限制约定。这种方式在面对经典的Executors时解决了第一个问题--错误隔离。效果如下:
错误消费者的调用不需要终端当前分配的资源。如果没有指定,它默认从当前存在的Environment中去寻找,并使用指定给它的errorJournalConsumer。
异步Dispatche提供的第二个独特的特征是运行使用尾部递归策略来再次调度。尾部递归的应用场景是分发器发现Dispatcher的classLoader已经分配到正在运行的线程,这时,当当前消费者返回时将要执行的task放入到队列中。
使用一个类似于 Groovy Spock test 的异步的多线程分发器:
import reactor.core.dispatch.* //... given: def sameThread = new SynchronousDispatcher() def diffThread = new ThreadPoolExecutorDispatcher(1, 128) def currentThread = Thread.currentThread() Thread taskThread = null def consumer = { ev -> taskThread = Thread.currentThread() } def errorConsumer = { error -> error.printStackTrace() } when: "a task is submitted" sameThread.dispatch('test', consumer, errorConsumer) then: "the task thread should be the current thread" currentThread == taskThread when: "a task is submitted to the thread pool dispatcher" def latch = new CountDownLatch(1) diffThread.dispatch('test', { ev -> consumer(ev); latch.countDown() }, errorConsumer) latch.await(5, TimeUnit.SECONDS) // Wait for task to execute then: "the task thread should be different when the current thread" taskThread != currentThread
注意:
如Java Executor一样,它们缺少了我们将加入到Reactor 2.x的一个特点:Reactive stream协议。这时在Reactor中仅有几个未完成事项中的一个未完成事项--没有将Reactive stream标准直接绑定到Reactor中。然后,你可以在Stream章节部分找到快速结合Reactor stream的方法。
表3 Dispatcher家族介绍
Dispatcher | From Environment | Description | Strengths | Weaknesses |
---|---|---|---|---|
RingBuffer | sharedDispatcher() | An LMAX Disruptor RingBuffer based Dispatcher. | Small latency peaks tolerated Fastest Async Dispatcher, 10-15M+ dispatch/sec on commodity hardware Support ordering | 'Spin' Loop when getting the next slot on full capcity Single Threaded, no concurrent dispatch |
Mpsc | sharedDispatcher() if Unsafe not available | Alternative optimized message-passing structure. | Latency peaks tolerated 5-10M+ dispatch/sec on commodity hardware Support ordering | Unbounded and possibly using as much available heap memory as possible Single Threaded, no concurrent dispatch |
WorkQueue | workDispatcher() | An LMAX Disruptor RingBuffer based Dispatcher. | Latency Peak tolerated for a limited time Fastest Multi-Threaded Dispatcher, 5-10M+ dispatch/sec on commodity hardware | 'Spin' Loop when getting the next slot on full capcity Concurrent dispatch Doesn’t support ordering |
Synchronous | dispatcher("sync") or SynchronousDispatcher. INSTANCE | Runs on the current thread. | Upstream and Consumer executions are colocated Useful for Test support Support ordering if the reentrant dispatch is on the current thread | No Tail Recursion support Blocking |
TailRecurse | tailRecurse() or TailRecurse Dispatcher. INSTANCE | Synchronous Reentrant Dispatcher that enqueue dispatches when currently dispatching. | Upstream and Consumer executions are colocated Reduce execution stack, greatly expanded by functional call chains | Unbounded Tail Recurse depth Blocking Support ordering (Thread Stealing) |
ThreadPoolExecutor | newDispatcher(int, int, DispatcherType. THREAD_POOL_EXECUTOR) | Use underlying ThreadPoolExecutor message-passing | Multi-Threaded Blocking Consumers, permanent latency tolerated 1-5M+ dispatch/sec on commodity hardware | Concurrent run on a given consumer executed twice or more Unbounded by default Doesn’t support ordering |
Traceable Delegating | N/A | Decorate an existing dispatcher with TRACE level logs. | Dispatch tapping Runs slower than the delegated dispatcher alone | Log overhead (runtime, disk) |
你可能已经注意到了,一些Dispatcher事单线程的,特别是RingBufferDispatcher和MpsDispatcher。更进一步,根据Reactive Stream规范,Subscriber/Processor的实现是不允许并发通知的。这一点尤其对Reactor Streams产生了影响,使用Stream.dispachOn(Dispatcher)和一个Dispatcher来给并发信号的显示失败留后门。
然后,有一个方法来避免这个缺点,使用Dispatcher池DispatcherSupplier。实际上,作为Supplier的工厂,Supplier.get()方法根据有趣的共享策略:轮询、最少使用。。等间接提供一个Dispatcher。
Enviroment提供了一个静态方法去创建、并注册到当前活跃Environment的Dispatcher池:一组轮询的返回Dispatcher。一旦就绪,Supplier提供对Dispatcher数目的控制。
不同于一般的Dispatcher,Environment提供了一站式的管理服务:
Environment.initialize(); //.... //Create an anonymous pool of 2 dispatchers with automatic default settings (same type than default dispatcher, default backlog size...) DispatcherSupplier supplier = Environment.newCachedDispatchers(2); Dispatcher d1 = supplier.get(); Dispatcher d2 = supplier.get(); Dispatcher d3 = supplier.get(); Dispatcher d4 = supplier.get(); Assert.isTrue( d1 == d3 && d2 == d4); supplier.shutdown(); //Create and register a new pool of 3 dispatchers DispatcherSupplier supplier1 = Environment.newCachedDispatchers(3, "myPool"); DispatcherSupplier supplier2 = Environment.cachedDispatchers("myPool"); Assert.isTrue( supplier1 == supplier2 ); supplier1.shutdown();
Dispatcher尽可能快的计算接收的任务,然而,Timer定时器提供一次性或者周期性的调度API。Reactor Core模块默认提供了一个HashWheelTimer定时器,它自动绑定到任意的新的Environment中。HashWheelTimer对处理大量的、并发的、内存调度任务有巨大的优势,它是替换java TaskScheduler的一个强大的选项。
注意:它不是一个持久化的调度器,应用关闭时task将会丢失。下个正式版本Timer定时器将会有一些改变,例如使用redis增加持久化/共享,请关注。
创建一个简单的定时器:
import reactor.fn.timer.Timer //... given: "a new timer" Environment.initializeIfEmpty() Timer timer = Environment.timer() def latch = new CountDownLatch(10) when: "a task is submitted" timer.schedule( { Long now -> latch.countDown() } as Consumer<Long>, period, TimeUnit.MILLISECONDS ) then: "the latch was counted down" latch.await(1, TimeUnit.SECONDS) timer.cancel() Environment.terminate()
核心Processor用来做比Dispatcher更集中的job:支持背压计算异步task。
提供了org.reactivestreams.Processor接口的直接实现,因此可以很好的和别的Reactive Stream厂商一起工作。
记住:Processor即是Subscriber也是Publisher,因此你可以在想要的地方(source,processing,sink)将一个Processor插入到Reactive stream chain中。
注意:规范不推荐直接使用Processor.onNext(d)。
基于RingBuffer的Reactive Stream Processor的优点如下:
高吞吐量
重启时不会丢掉没有消费的数据,且从最近的没有消费的数据开始执行
若没有Subscriber监听,数据不会丢失(不想Reactor-stream的Broadcaster会丢掉数据)
若在消息处理过程中取消Subscriber,信号将会安全的重新执行,实际上它能在RingBufferProcessor上很好的工作。
灵活的背压,它允许任意时间内有限数量的背压,Subscriber会消费掉并且请求更多的数据。
传播的背压,因为它是一个Processor,它可以通过订阅方式传递消息。
多线程的出/入Processor。
它们的唯一缺点是它们在运行时创建它们会消耗大量的资源,原因是它们不像它们的兄弟RingBufferDispatcher可以很容易的共享,这种特性使它们更适应于高吞吐量的预定义数据管道。
Reactor的RingBufferProcessor组件本质上是Disruptor的RingBuffer,设计的目的是尽可能的和原生的效率一样。使用场景是:你需要分发task到另外一个线程,且该线程具有低耗、高吞吐量还在你的工作流中管理背压。
我使用RingBufferProcessor来计算远程异步调用的各种输出:AMQP, SSD存储和内存存储,Process完全处理掉易变的延迟,每秒百万级别的消息的数据源从来没有阻塞过。 — 友好的Reactor使用者 RingBufferProcessor的使用场景
图7 在跟定时间T内,一个ringbufferprocessor,2个消费同一个sequence的Subscriber。
你可以使用静态工具方法去创建一个ringbufferprocessor:
Processor<Integer, Integer> p = RingBufferProcessor.create("test", 32); //1 Stream<Integer> s = Streams.wrap(p); //2 s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); //3 s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); //4 s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); //5 input.subscribe(p); //5
1.创建一个Processor,让它具有32个slot的内部RingBuffer。
2. 从Reactive Streams Processor创建一个Reactor。
3. 每个请求调用consume方法在自己的线程内创建一个Disruptor的EventProcessor。
4. 每个请求调用consume方法在自己的线程内创建一个Disruptor的EventProcessor。
5. 每个请求调用consume方法在自己的线程内创建一个Disruptor的EventProcessor。
6. 向一个Reactive Streams Publisher订阅这个Processor。
传递到Processor的Subscribe.onNext(Buffer)方法的每个数据元素将广播给所有的消费者。这个Processor没有使用轮询分发,因为它在RingBufferWorkProcess中,RingBufferWorkProcess下面将要讨论。若传递1、2、3三个整数到Processor,可以看到控制台输出结果如下:
Thread[test-2,5,main] data=1 Thread[test-1,5,main] data=1 Thread[test-3,5,main] data=1 Thread[test-1,5,main] data=2 Thread[test-2,5,main] data=2 Thread[test-1,5,main] data=3 Thread[test-3,5,main] data=2 Thread[test-2,5,main] data=3 Thread[test-3,5,main] data=3
每个线程接收到传给Process的所有数据,每个线程顺序获得数据,因为内部使用RingBuffer管理
slot来发布数据。
不像标准的RingBufferProcessor只广播它的值给所有的消费者,RingBufferWorkProcessor基于消费者的多少来分发请求值。Processor接收信息,然后轮询发送到不同的线程中(因为每个消费者有自己独立的线程),然而使用内部RingBuffer来有效管理消息的发布。
我们构造了一个可扩展的、多种htp微服务器请求负载均衡的RingBufferWorkProcessor.说它看起来快过光速可能是我错了,另外gc的压力完全可控。
— 使用RingBufferWorkProcessor的Reactor友好者
使用RingBufferWorkProcessor非常简单,你只要改变上面示例代码的引用到静态的create方法创建。使用RingBufferWorkProcessor如下,其它的代码时一样的。
Processor< Integer, Integer> p = RingBufferWorkProcessor.create( " test ", 32);
创建一个具有32个slot的内部RingBuffer的Processor。
现在,发布消息到Processor时,将不会广播给每一个consumer,会根据消费者的数目分发给不同的消费者。运行示例,结果如下:
Thread[test-2,5,main] data=3 Thread[test-3,5,main] data=2 Thread[test-1,5,main] data=1
注意,RingBufferWorkProcessor会重复终端的信号、检测正在停止工作的Subscriber的取消异常,最终会被别的Subscriber执行一次。我们保证适合事件至少发送一次。若你理解这个语义,你可能会立即说“等等,RingBufferWorkProcessor怎么作为一个消息代理工作啦?” 答案是肯定的。
字节码操作对大量数据管道配置的应用是一个核心关注点。reactor-net广泛使用字节码操作来对接收的字节码进行编组和分组或者通过IO发送。
reactor.io.buffer.Buffer是java byteBuffer处理的一个装饰器,增加了一些列的操作。目的是通过使用ByteBuffer的limit和读取/覆盖预先分配的字节来减少字节的复制。追踪ByteBuffer的位置是开发人员口头的问题,Buffer简化了这些,我们只需要关注这个简单的工具就可以了。
下面是一个简单的Buffer操作示例:
import reactor.io.buffer.Buffer //... given: "an empty Buffer and a full Buffer" def buff = new Buffer() def fullBuff = Buffer.wrap("Hello World!") when: "a Buffer is appended" buff.append(fullBuff) then: "the Buffer was added" buff.position() == 12 buff.flip().asString() == "Hello World!"
Buffer的一个有用的应用是Buffer.View,多个操作例如split都会返回Buffer.View。它提供了一个无需拷贝的方式去扫描和检索ByteBuffer的字节码。Buffer.View同样也是一种Buffer。
使用一个分隔符和Buffer.view使块数据读取可以复用同样的字节码
byte delimiter = (byte) ';'; byte innerDelimiter = (byte) ','; Buffer buffer = Buffer.wrap("a;b-1,b-2;c;d"); List<Buffer.View> views = buffer.split(delimiter); int viewCount = views.size(); Assert.isTrue(viewCount == 4); for (Buffer.View view : views) { System.out.println(view.asString()); //prints "a" then "b-1,b-2", then "c" and finally "d" if(view.indexOf(innerDelimiter) != -1){ for(Buffer.View innerView : view.split(innerDelimiter)){ System.out.println(innerView.asString()); //prints "b-1" and "b-2" } } }
使用Buffer应用到普通的分组和编组对开发者来说可能显得不够高级,Reactor提供了一系列名称为Codec的预定义的转换器。一些Codec需要在classpath路径下添加一些额外的依赖,如json操作的Jackson依赖。
codec以两种方式工作:第一,继承Function去直接编码并返回编码好的数据,通常以Buffer的形式返回。这非常棒,但仅限于与无状态的Codec才能起效,另外一个可选的方法是使用Codec.encoder来返回编码函数。
Codec.encoder()对比Codec.apply(Source) Codec.encoder() 返回一个唯一的编码函数,这个编码函数不能被不同线程共享。 Codec.apply(Source) 直接编码(并保存分配的编码器), 但Codec本身可以在线程间共享。
对大部分实现了Buffer的codec来说,Codec同样也可以根据source类型去解码数据。
解码数据源,需要使用Codec.decoder()获取解码函数。和编码不同的是,没有为编码目的而重写的快捷方法。和编码相同的是,解码函数不能在线程间共享。
有两种形式的Code.decoder()函数,Codec.decoder()是一个阻塞的解码函数,它直接从传递源数据解码返回解码后的数据。Codec.decoder(Consumer)用作非阻塞的解码,它返回null,一旦解码只触发的Consumer,它可以和其它异步工具结合使用。
使用一个预定义的codec示例如下:
import reactor.io.json.JsonCodec //... given: 'A JSON codec' def codec = new JsonCodec<Map<String, Object>, Object>(Map); def latch = new CountDownLatch(1) when: 'The decoder is passed some JSON' Map<String, Object> decoded; def callbackDecoder = codec.decoder{ decoded = it latch.countDown() } def blockingDecoder = codec.decoder() //yes this is real simple async strategy, but that's not the point here :) Thread.start{ callbackDecoder.apply(Buffer.wrap("{/"a/": /"alpha/"}")) } def decodedMap = blockingDecoder.apply(Buffer.wrap("{/"a/": /"beta/"}") then: 'The decoded maps have the expected entries' latch.await() decoded.size() == 1 decoded['a'] == 'alpha' decodedMap['a'] == 'beta'
可用的核心Codec
名称 | 描述 | 需要的依赖 |
---|---|---|
ByteArrayCodec | Wrap/unwrap byte arrays from/to Buffer. | N/A |
DelimitedCodec | Split/Aggregate Buffer and delegate to the passed Codec for unit marshalling. | N/A |
FrameCodec | Split/Aggregate Buffer into | N/A |
JavaSerializationCodec | Deserialize/Serialize Buffers using Java Serialization. | N/A |
PassThroughCodec | Leave the Buffers untouched. | N/A |
StringCodec | Convert String to/from Buffer | N/A |
LengthFieldCodec | Find the length and decode/encode the appropriate number of bytes into/from Buffer | N/A |
KryoCodec | Convert Buffer into Java objects using Kryo with Buffers | com.esotericsoftware.kryo:kryo |
JsonCodec,JacksonJsonCodec | Convert Buffer into Java objects using Jackson with Buffers | com.fasterxml.jackson.core:jackson-databind |
SnappyCodec | A Compression Codec which applies a delegate Codec after unpacking/before packing Buffer | org.xerial.snappy:snappy-java |
GZipCodec | A Compression Codec which applies a delegate Codec after unpacking/before packing Buffer | N/A |
参考文献:
1. http://baike.baidu.com/link?url=kXnm3flViIx-4E7PxZtYVgb3xY5tlwovUqog2u_TgCCiN7FSFkxt7ze-Qio5j1FXPmIz2DGV2_lbOBoLeyXdaa
2. http://projectreactor.io/docs/reference/