Reactor更推荐,当然RxJava 2.x仍然是一个不错的选择,如果你使用Android,那么RxJava 2.x是你唯一的选择.
首先,从功能角度来看,RxJava和Reactor的两个版本都非常相似。如果您知道RxJava 1.x或2.x,Reactor将会很快非常熟悉,但您仍然需要了解差异。
其次, java.util.concurrent.Flow 类(一组接口,确切地说)是 反应流 规范的一部分,捆绑到JDK中。该规范规定各种反应库应该礼貌地表现并且干净地相互交互。然而,规范诞生于Java 9之前并引入Flow因此,库是基于外部的reactive-streams.jar,而不是JDK。当谈到RxJava / Reactor比较时,有很多观点。让我快速介绍一些差异。我假设您对这两个库都有一定的了解。
API
Flowable并且Flux有非常相似的API。显然,他们都支持,如基本的操作map(),filter(),flatMap(),以及更先进的。主要区别在于Java目标版本。RxJava 2.x必须仍然支持Java 6,因为它在Android上被广泛使用(稍后阅读)。另一方面,Reactor的目标是Java 8+。因此,Reactor可以利用现代(-ish,Java 8是5年前,在撰写本文时)API java.time和java.util.function。输入更加安全:
<b>import</b> java.time.Duration; ... flux.window(Duration.ofSeconds(1));
而不是:
<b>import</b> java.util.concurrent.TimeUnit; ... flowable.window(1, TimeUnit.SECONDS);
传递单个Duration实例比整数更容易,更安全。Reactor是直接对应CompletableFuture,Optional,java.util.stream.Stream等。
Reactor加1分
类型安全
谈到类型安全性,我真的很想念RxJava 1/2中引入的细粒度类型。
RxJava 2: Completable
Reactor:N / A
成功完成或失败,不产生任何值。类似CompletableFuture<Void>
RxJava 2: Maybe<T>
Reactor: Mono<T>
成功完成或失败,可能会也可能不会发出单个值。像异步一样Optional<T>
RxJava 2: Single<T>
Reactor:N / A
完成成功发射一个条目或失败。
RxJava 2: Observable<T>
Reactor:N / A
发出无限数量的事件(从零到无限),可选择成功完成或失败。由于其代表的事件来源的性质,不支持背压。
RxJava 2: Flowable<T>
Reactor: Flux<T>
发出无限数量的事件(从零到无限),可选择成功完成或失败。支持背压(当消费者无法跟上时,源可能会减慢)
Reactor中缺少某些类型并不意味着它不支持某些用例。如果需要Completable,可以使用笨拙的Mono<Void>(如Mono.then()操作符)。
Observable和Flowable之间的区别给你一个提示,你应该期待什么样的流量控制flow-control 。
RxJava 2也分开Observable和Flowable类型。如果源本身是无法控制的,我们可以在类型安全中表达Observable。但是Observable有些操作毫无意义或无法实施。没关系,使用另一Flowable,它具有完全的背压支持,这意味着它可以减速。
我可以轻松在Flowable和Observable之间转换,当消费者无法跟上生产者时,我该怎么办,但生产者不能放慢速度?删除额外的消息?暂缓一段时间?在Reactor中,两种类型的流都表示为Flux(如在RxJava 1.x中),因此您可能总是因为缺少背压而出现错误。在RxJava 2中,由于明确的保证,这种出错现象有点不太常见。
RxJava加1分。
检查异常
Reactor使用JDK中的标准函数类型,就像 Function 它的API一样。那很棒。但是这种选择的一个微小副作用是对转换中的已检查异常进行笨拙的处理。请考虑以下不编译的代码:
Flux .just(<font>"java.math.BigDecimal"</font><font>, </font><font>"java.time.Instant"</font><font>) .map(Class::forName) </font>
Class.forName()抛出检查ClassNotFoundException,遗憾的是,您不能从中抛出已检查的异常java.util.function.Function。另一方面,在RxJava中, io.reactivex.functions.Function 没有这样的约束,类似的代码可以编译得很好。无论您是否喜欢已检查的异常,一旦您必须处理它们,RxJava会让您的体验更加愉快。
RxJava加1分。虽然我不认为这是一个主要优势。
测试
两个库中调度程序的存在不仅允许对并发进行细粒度控制。调度程序在单元测试中也发挥着重要作用。在Reactor和RxJava中,您可以将基于挂钟的调度程序替换为基于人工虚拟时钟的调度程序。当您在时间过去测试流的行为时,这非常方便。定期事件,超时,延迟 - 所有这些都可以可靠地进行单元测试。
Reactor更进了一步。在RxJava中,您必须外部化每个调度程序的配置,以便您可以在单元测试中替换它。本质上还不错,你应该将它们外化。但是,当你需要测试时,它会很快变得混乱, TestScheduler 在您的生产代码中的几十个地方;另一方面,在Reactor中,它足以包围测试中的代码,并且所有底层调度程序都会被虚拟替换为自动神奇地替换:
StepVerifier .withVirtualTime(() -> Flux .never() .timeout(ofMillis(100)) ) .expectSubscription() .expectNoEvent(ofMillis(99)) .thenAwait(ofMillis(1)) .expectError(TimeoutException.<b>class</b>) .verify(ofSeconds(1));
此特定测试确保超时按预期工作。测试非常精确*且100%可预测。没有sleeping 或忙碌等待结果。与RxJava相比,优势在于无论您的流程有多复杂,所有调度程序都是stubbed存根的。在RxJava中,您可以编写类似的测试,但是您必须确保替换正在测试的代码中的所有调度程序TestScheduler。反应器方便地通过所有层注入虚拟时钟。
Reactor加1分
调试
Reactor添加了一个很棒的调试宝石:
Hooks.onOperatorDebug();
放置在应用程序开头的这条细线将跟踪信号在流中的流动方式。我们来看一个实际的例子吧。想象一下以下流:
<b>import</b> reactor.core.publisher.Flux; <b>import</b> reactor.core.publisher.Hooks; <b>import</b> reactor.core.publisher.Mono; <b>import</b> java.io.File; <b>public</b> <b>class</b> StackTest { <b>public</b> <b>static</b> <b>void</b> main(String[] args) { Mono<Long> totalTxtSize = Flux .just(<font>"/tmp"</font><font>, </font><font>"/home"</font><font>, </font><font>"/404"</font><font>) .map(File::<b>new</b>) .concatMap(file -> Flux.just(file.listFiles())) .filter(File::isFile) .filter(file -> file.getName().endsWith(</font><font>".txt"</font><font>)) .map(File::length) .reduce(0L, Math::addExact); totalTxtSize.subscribe(System.out::println); } } </font>
它发现所有的.txt文件下/tmp,/home和/404目录,并计算出它们全部的总规模。该程序在运行时失败,具有神秘的,长达一英里的堆栈跟踪:
java.lang.NullPointerException at reactor.core.publisher.Flux.fromArray(Flux.java:953) at reactor.core.publisher.Flux.just(Flux.java:1161) at com.nurkiewicz.StackTest.lambda$main$0(StackTest.java:16) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:368) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:244) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) at reactor.core.publisher.FluxArray$ArraySubscription.slowPath(FluxArray.java:126) at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:99) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:229) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53) at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59) at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63) at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:121) at reactor.core.publisher.FluxFilter.subscribe(FluxFilter.java:49) at reactor.core.publisher.FluxFilter.subscribe(FluxFilter.java:53) at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62) at reactor.core.publisher.MonoReduceSeed.subscribe(MonoReduceSeed.java:65) at reactor.core.publisher.Mono.subscribe(Mono.java:3695) at reactor.core.publisher.Mono.subscribeWith(Mono.java:3801) at reactor.core.publisher.Mono.subscribe(Mono.java:3689) at reactor.core.publisher.Mono.subscribe(Mono.java:3656) at reactor.core.publisher.Mono.subscribe(Mono.java:3603) at com.nurkiewicz.StackTest.main(StackTest.java:23)
如果你清理一下堆栈,你可能会感觉到看到了臭名昭着的NullPointerException:
at ...Flux.fromArray() at ...Flux.just() at com.nurkiewicz.StackTest.lambda$main$0(StackTest.java:16) ... at ...FluxArray.subscribe() at ...FluxMapFuseable.subscribe() at ...FluxConcatMap.subscribe() at ...FluxFilter.subscribe() at ...FluxFilter.subscribe() at ...FluxMap.subscribe() at ...MonoReduceSeed.subscribe() ... at com.nurkiewicz.StackTest.main(StackTest.java:23)
但它没有多大帮助,大多数堆栈跟踪指向Reactor源代码(你不想去那里)。在我们自己的代码中查看所述运算符的声明位置要方便得多。这是Hooks.onOperatorDebug()前面提到的堆栈跟踪旁边显示的内容:
Assembly trace from producer [reactor.core.publisher.FluxConcatMap] : reactor.core.publisher.Flux.concatMap(Flux.java:3425) com.nurkiewicz.StackTest.main(StackTest.java:17) Error has been observed by the following <b>operator</b>(s): |_ Flux.concatMap ⇢ com.nurkiewicz.StackTest.main(StackTest.java:17) |_ Flux.filter ⇢ com.nurkiewicz.StackTest.main(StackTest.java:18) |_ Flux.filter ⇢ com.nurkiewicz.StackTest.main(StackTest.java:19) |_ Flux.map ⇢ com.nurkiewicz.StackTest.main(StackTest.java:20) |_ Flux.reduce ⇢ com.nurkiewicz.StackTest.main(StackTest.java:21)
矛盾的是,我们对运行时抛出异常的地方不感兴趣。答案几乎总是:在Reactor勇气中。我们更愿意看看故障流是如何构建的。Reactor在这里是无与伦比的。调试反应式程序很难,真的很难。这个操作符使它更容易一些。顺便问一下你知道的来源是NullPointerException什么?来自JavaDocFile.listFiles():
如果发生I / O错误,则返回null。
返回... null ...如果...错误...发生。在二十一世纪。
Reactor加1分
Spring支持
您可以自由使用没有Spring框架的Reactor。您也可以在没有Reactor的情况下使用Spring框架。但事实恰恰相反,他们非常紧密地集成了Spring WebFlux(请注意名称)Mono并Flux广泛使用。Reactor加1分
Android开发
RxJava在Android开发人员中非常受欢迎。它非常干净地解决了两个问题:
这就是为什么RxJava仍然针对较旧的Java版本的原因之一。这可能会在未来发生变化,但在撰写本文时,RxJava是Android开发人员的唯一选择。它是一个坚固的库,所以我认为它们不会错过Reactor。为RxJava +1
成熟
RxJava在市场上更加成熟和完善(参见:Android)。此外,有许多独立项目选择RxJava作为其API,例如,官方 Couchbase驱动程序 。对于MongoDB也是如此,但他们从RcJava驱动程序转移到更兼容RxJava和Reactor的通用 反应流驱动程序 。这同样适用于 RxNetty 的弟弟 reactor-netty 。RxJava书籍的数量也大大超过了Reactor的数量。因此,暂时为RxJava +1,但这很可能会在未来几个月内发生变化。
总结
我没想到,但事实证明我们有一个平局。然而,展望未来,Reactor肯定更有前途。它的 表现似乎更好 ,开发更活跃,并由更大的玩家(Pivotal)支持。这些库非常相似(至少从API的角度来看),但如果你做出选择,Reactor可能会更好地为你服务。