原文地址:http://projectreactor.io/docs/reference/
Reactor是一个基础库,用在构建实时数据流应用、要求有容错和低延迟至毫秒、纳秒、皮秒的服务。
— Preface TL;DR
让我们大致了解一下Reactor。在你使用喜欢的搜索敲入一些关键词如Reactive、spring Reactive、Asynchronous java或者仅仅是" What the heck is Reactor? ".简而言之,Reactor是一个轻量级的JVM基础库,它可以帮助我们构建的服务和应用高效而异步的传递消息。
高效的含义是什么呢? 传递一个消息从A到B时GC产生的内存很小或者完全没有。 当消费者处理消息的速度低于生产者产生消息的速度时产生了溢出时,必须尽快处理。 尽可能的提供无锁的异步流。
据以往的经验来看,我们知道异步编程是困难的,特别是当一个平台提供了很多选项如JVM。
Reactor瞄准绝大部分场景中真正的无阻塞,并且提供了一组比原生Jdk的java.util.concurrent库更高效的API。Reactor也提供了一个可选性(不建议使用):
阻塞等待:如Future.get()。
Unsafe数据获取:如ReentrantLock.lock()。
异常抛出:如try ..catch ...finally
同步阻塞:如 syschronized
Wrapper配置(GC压力):例如 new Wrapper<T>(event)
让我们先使用一个纯正的Executor方法:
private ExecutorService threadPool = Executors.newFixedThreadPool(8); final List<T> batches = new ArrayList<T>(); Callable<T> t = new Callable<T>() { //1 public T run() { synchronized(batches) { //2 T result = callDatabase(msg); //3 batches.add(result); return result; } } }; Future<T> f = threadPool.submit(t); //4 T result = f.get() //5
1.分配回调方法---可能会导致gc压力。
2.Synchronization将强制对每个线程停止检查。
3. 存在消费者的消费能力低于生产者生产能力的隐患。
4. 使用线程池将task传递到目标线程--肯定通过FutureTask给gc造成压力。
5. 阻塞直至callDatabase()响应。
从上述的简单示例中,容易看出扩展性会受到严重的影响。
不断分配的对象将导致gc停止工作,特别是耗时比较多的大任务时。当一个gc停止工作时将会从降低全局的性能。
队列默认情况下长度是不受限制的。任务会堆积到数据库中。
后台日志不是一个内存泄露的地方,但是副作用就比较烦人了:在gc暂停工作时需要扫描更多对象;损失数据重要bit的风险;等等。
经典链接Queue分配节点时产生的内存压力。
使用阻塞方式应答请求时发生恶性循环。
阻塞方式应答导致生产者效率慢下来。实际上,因为需要提交更多任务时等待响应,流程变成了基本的同步方式。
同数据存储的通信异常将以不友好的形式传递到生产者,通过线程边界来分离工作,这使容错的协商变的比较容易。
完全的、真正的非阻塞比较难以实现---特别是有比较时髦名称的分布式系统中如微服务架构。然而,Reactor却没有妥协,它试图利用可用的最佳模式来使开发者不必觉得像是在写一个数学论文而仅仅是一个微服务(nanservice)。
没有什么比较光更快的了(除了流言蜚语和病毒猫视频),在某些方面,延迟是每个真实世界的系统必须关注的。为此:
反应器提供了一个框架,可以帮助你减轻恶心的延迟引起的副作用,在应用程序中使用最小的开销:使用一些灵活的结构,通过在启动时预先分配在运行时的分配数据结构来避免分配问题。
限制主消息传送结构,因而不会导致任务无限的累积。
利用流行的模式例如Reactive和事件驱动架构来提供一个包含应答的非阻塞的、端对端流。
实现了最新的Reactive流标准,通过不发送多于当前容量的请求来使受限的结构更有效率。
使用这些概念到进程间通信,提供了理解控制流的非阻塞IO驱动。
对开发者暴露功能API,帮助开发者使用一个无副作用的方式组织代码,也帮助你确定在什么场景下你是线程安全和具有容错性的。
该项目始于2012年,孕育时间较长。2013年出现Reactor1.x版本。该版本成功部署到不同的组织,不仅有开源组织如MeltDown、还有商业机构如Pivotal RTI。2014年我们实现了新的"Reactive流标准",并在2015年的4月开始了版本2.0的大规模重构目标。Reactive流标准拉近了分发机制的鸿沟:控制多少线程传递多少数据。
同时我们也决定重新调整我们的一些事件驱动和任务协调API的来应对日益流行、记录的reactive扩展。
Reactor由Pivotal赞助支持,有两个核心提交者。因为Pivotal同时也是spring框架的东家,我们的很多同事也是不同spring项目的核心贡献者,所以我们也提供从Reactor到spring的集成同时也支持spring框架的一些重要功能如spring消息模块的STOMP代理。也就是说,我们不会强迫仅仅想使用Reactor的人去适应spring。我们保留了一个大容量Reactive的内嵌工具。事实上,Reactor的目标之一是在你解决异步和功能性问题时保持公正的态度。
Reactor遵循 Apache 2.0 licensed ,可以通过 GitHub 获取。
使用要求:
Reactor需要jdk7及以上版本。
但完整的功能组合表达式需要java8的lambdas支持。
作为后备,支持spring clojure和groovy的扩展。
Reactor需要jvm支持Unsafe方式获取(如:android不支持)时才能表现最全的功能。
当Unsafe获取不支持是所有基于RingBuffer的特定将不能工作。
Reactor打包成传统的jar形式存在于maven中央库中,可以使用你喜欢的工具来拉取这个依赖包。
Reactor基本代码划分为几个子模块,这样你可以单独使用某一模块而抛弃不需要的模块。
下述是一些使用Reactor模块和其它混合的Reactive技术示例,完成异步目标:
Spring XD + Reactor-Net (Core/Stream) : 使用Reactor 作为Sink/Source IO 驱动.
Grails | Spring + Reactor-Stream (Core) : 使用Stream和 Promise作为后台处理程序。
Spring Data + Reactor-Bus (Core) : 生产数据库事件(Save/Delete/… ).
Spring集成Java DSL + Reactor Stream (Core) : Microbatch MessageChannel from Spring Integration.
RxJavaReactiveStreams + RxJava + Reactor-Core : Combine rich composition with efficient asynchronous IO Processor
RxJavaReactiveStreams + RxJava + Reactor-Net (Core/Stream) : Compose input data with RxJava and gate with Async IO drivers.
Reactive stream是一个新的标准,不同的厂商和技术组织包括Netflix,oracle,Pivotal,TypeSafe支持这个标准。该标准有望被java9或者以后的版本的标准收录进去。
该标准的目标是提供一种同步或者异步的具有控制流机制的数据序列。这个标准是轻量级的,第一个目标是JVM。它提供了4个java接口,一个Tck和一系列的示例。根据需要,4个java接口的实现非常直接,这个项目的内涵是Tck对操作的校验。
图三 Reactive stream协约
Reactive Streams接口 org.reactivestreams.Pubslisher: 数据源(从0到N个数据,n是任意的).它提供两个可选的中断事件:error和completion。 org.reactivestreams.Subscriber: 数据序列的消费者(从0到N个数据,n是任意的).它在初始化时接收一个Subscription,subscription获取Subscriber将要处理多少数据。 与其它数据时序信号交互的其它回调有: next (新消息)和可选的completion/error.
org.reactivestreams.Subscription:在初始化时传给Subscriber的一个小的追踪器。.它控制着我们准备消费掉多少数据和什么时候停止消费(取消).
org.reactivestreams.Processor: 既是Subscriber和Publisher的组件标识!
图四:Reactive Stream发布协约
通过传递Subscriber,一个请求数据从subscriber到publisher有两种方式: 无限制的: 在订阅时, 仅调用Subscription的request(Long.MAX_VALUE)方法. 有限制的: 在订阅时, 保留subscription的引用,并且当subscriber准备处理数据时调用request(long)方法。 通常, 在订阅时Subscribers将请求一组初始数据或者甚至1个数据。 然后,onNext认为执行成功(例如后面的Commit, Flush等等), 请求更多的数据。 建议使用线性组的请求。为避免请求重叠,例如每次下次请求时请求10个或者更多的数据。
表1 目前为止,Reactor直接使用的Reactive stream接口及实现
Reactive Streams | Reactor模块 | 实现 | 说明 |
---|---|---|---|
Processor | reactor-core, reactor-stream | reactor.core.processor.*, reactor.rx.* | 在core模块,提供了RingBuffer处理器,在stream模块,提供了一整组操作和Broadcaster。 |
Publisher | reactor-core, reactor-bus, reactor-stream, reactor-net | reactor.core.processor.*, reactor.rx.stream.*, reactor.rx.action.*, reactor.io.net.* | 在core模块,处理器继承了Publisher.在bus模块,发布一个不限制的路由事件,在stream模块,stream扩展直接继承Publisher. 在net模块,Chanel继承了Publisher来消费请求数据,同时也提供了具有flush和close的回调的providers. |
Subscriber | reactor-core, reactor-bus, reactor-stream, reactor-net | reactor.core.processor.*, reactor.bus.EventBus.*, reactor.rx.action.*, reactor.io.net.impl.* | 在core模块,处理器继承了Subscriber. 在bus模块,提供了无限制的Publisher/Subscriber能力.在stream模块,Subscribers计算特定的回调行为.在Net模块,subscriber的IO层实现处理写、关闭和flush. |
Subscription | reactor-stream, reactor-net | reactor.rx.subscription.*, reactor.io.net.impl.* | 在stream模块, 提供了一个优化过的PushSubscriptions和 buffering-ready ReactiveSubscription. 在Net模块, 使用自定义Subscription实现背压的方式实现异步IO读。 |
从reactor 2启动时我们就一直遵循这个标准,并且随着标准的改变而改变直到1.0.0正式版准备发布。现在可以通过maven中央库及其流行的镜像可以找到该标准,你将发现它作为过渡,依赖于reactor-core模块。
Reactive扩展或者通常称作Rx,是一种定义完备的功能api,这些api扩展了观察者模式到一个史诗的程度。
Rx模式支持实现了使用少数设计的关键字来处理Reactive 数据序列:
使用回调链来抽象实时及延迟:当可以获得到数据时调用。
抽象了一直使用的线程模式:同步或者异步仅仅是我们处理的Observable/Stream。
控制错误传递及停止:错误和完成信号及数据的有效负载信号传递到链中。
在多个预先定义的api中解决了多个扩展-聚合及其它组合问题。
Reactive扩展的标准Jvm实现是RxJava。它提供了一个功能丰富的Api。
Reactor 2 提供了一个特定模块实现了Reactive扩展的一部分功能。建议需要使用Reactive stream全部功能的用户使用RxJava。最后,当组合完整的RxJava系统时,用户可以从Reactor提供的强大的异步和IO的中获益。
表2:Rx和Reactor stream的不同点:
rx | reactor-stream | 说明 |
---|---|---|
Observable | reactor.rx.Stream | Reactive Stream Publisher的实现 |
Operator | reactor.rx.action.Action | Reactive Stream Processor的实现 |
Observable with 1 data at most | reactor.rx.Promise | 返回唯一结果的类型, Reactive Stream Processor实现并提供了可选的异步分发功能。 |
Factory API (just, from, merge… .) | reactor.rx.Streams | 和core模块的 data-focused 子类一样, 返回 Stream |
Functional API (map, filter, take… .) | reactor.rx.Stream | 和core模块的data-focused 子类一样, 返回Stream |
Schedulers | reactor.core.Dispatcher, org.reactivestreams.Processor | Reactor Stream计算无限制的共享Dispatcher或者有限的Processor的操作。 |
Observable.observeOn() | Stream.dispatchOn() | 只是dispatcher参数的一个适配命名。 |