我从事Akka Streams的Scala项目已经有很多年了,我对需要提防的事情有相当好的感觉。在我当前的项目中,我们正在使用Java,并且正在使用Reactive Streams Specification的Reactor的实现。在学习该库包时,我偶然发现了许多常见的错误和不良做法,这些我将在这里列出。感谢 Enric Sala 指出了这些不良做法。
反应流
首先,让我们看一下Reactive Streams规范,看看Reactor如何映射实现它。规则非常简单:
<b>public</b> <b>interface</b> Publisher<T> { <b>public</b> <b>void</b> subscribe(Subscriber<? <b>super</b> T> s); } <b>public</b> <b>interface</b> Subscriber<T> { <b>public</b> <b>void</b> onSubscribe(Subscription s); <b>public</b> <b>void</b> onNext(T t); <b>public</b> <b>void</b> onError(Throwable t); <b>public</b> <b>void</b> onComplete(); } <b>public</b> <b>interface</b> Subscription { <b>public</b> <b>void</b> request(<b>long</b> n); <font><i>// back pressure happens here</i></font><font> <b>public</b> <b>void</b> cancel(); } </font>
Publisher是一个潜在的数据源。人们可以订阅Publisher一个Subscriber,一个订阅Subscription传递到一个Subscriber,订阅Subscription是来自Publisher的要求。这是反应流的核心原理,这个订阅要求是控制数据是否通过的关键。
对于Reactor,您需要处理两种基本类型:
订阅Subscription的实现方法是以阻塞方式使用 :Mono或Flux,这是一种堵塞方法的变体,例如可以使用订阅来注册一个lambda,这将返回Disposable,可用于取消订阅的类型。有一个CoreSubscriber实现Subscriber接口的类型,但这更像一个内部API,作为库包的用户,您实际上不必直接使用它。
好了,足够的理论。让我们深入一些代码。在下面,我将列出10个潜在使用订阅时有问题的代码段。有些将是完全错误的,而另一些则更像是不良习惯或气味。你能发现他们吗?
#1: Whoop Whoop Reactive!
开始试用Mono的简单应用:
<b>interface</b> Service { Mono<Void> update(String s); } <b>class</b> Foo { <b>private</b> <b>final</b> Service service; <b>void</b> problem() { service.update(<font>"foo"</font><font>); } } </font>
在我们的problem方法中,我们正在调用一个update返回的方法Mono<Void>。这是一个空白,因为我们并不真正在乎结果,所以这里可能出什么问题了?
好吧,该update方法实际上根本不会执行。还记得Subscription要求决定了数据流是否可以通过吗?这是由Subscription控制的。在此代码段中,我们根本没有订阅Mono,因此将不会执行。
解决方法非常简单。我们只需要使用终端操作,例如block或的subscribe变体之一。
<b>interface</b> Service { Mono<Void> update(String s); } <b>class</b> Foo { <b>private</b> <b>final</b> Service service; Mono<Void> problem() { <b>return</b> service.update(<font>"foo"</font><font>); } } </font>
我们可以将Mono传递给problem方法的调用者。
#2: Reactive + Reactive = Reactive
看看组合reactive方法:
<b>interface</b> Service { Mono<String> create(String s); Mono<Void> update(String s); } <b>class</b> Foo { <b>private</b> <b>final</b> Service service; Mono<Void> problem() { <b>return</b> service .create(<font>"foo"</font><font>) .doOnNext(service::update) .then(); } } </font>
我们首先调用create,然后使用doOnNext来对该update方法进行调用。then()调用可确保我们返回一个Mono<Void>类型。应该没事吧?
在这种情况下update方法也不会执行,可能会让您感到惊讶。使用doOnNext或任何doOn*方法均不订阅发布者。
#3:订阅所有发布者!
太酷了,我们知道如何解决这个问题!只需订阅内部发布者,对不对?
<b>interface</b> Service { Mono<String> create(String s); Mono<Void> update(String s); } <b>class</b> Foo { <b>private</b> <b>final</b> Service service; Mono<Void> problem() { <b>return</b> service .create(<font>"foo"</font><font>) .doOnNext(foo -> service.update(foo).block()) .then(); } } </font>
这实际上可能有效,但是内部订阅不会很好地传播。这意味着作为problem方法返回的订阅者,我们没有任何控制权。
使用doOn*有副作用的地方:例如记录日志,上传指标。
为了正确修复此代码并传播内部订阅,我们需要使用其中一种:map。flatMap折叠内部Mono并组成单个流。我们也可以删除then()调用,因为flatMap将已经返回内部发布者的类型:Mono<Void>。
<b>interface</b> Service { Mono<String> create(String s); Mono<Void> update(String s); } <b>class</b> Foo { <b>private</b> <b>final</b> Service service; Mono<Void> problem() { <b>return</b> service .create(<font>"foo"</font><font>) .flatMap(service::update); } } </font>
#4:我不太明白……
您准备好再来一个吗?
<b>interface</b> Service { Mono<Void> update(String s); } <b>class</b> Foo { <b>private</b> <b>final</b> Service service; <b>void</b> problem() { <b>try</b> { service.update(<font>"foo"</font><font>).subscribe(); } <b>catch</b> (Exception e) { e.printStackTrace(); } } } </font>
这次对由update方法返回的结果进行Mono处理。它可能会抛出一个错误,因此我们应用防御性编程并将该调用包装在try-catch块中。
但是,由于该subscribe方法不一定会阻塞,因此我们可能根本不会捕获该异常。简单的try-catch结构对(可能)异步代码没有帮助。
要解决此问题,我们可以block()再次使用,subscribe()或者可以使用一种内置的错误处理机制。
<b>interface</b> Service { Mono<Void> update(String s); } <b>class</b> Foo { <b>private</b> <b>final</b> Service service; <b>void</b> problem() { service.update(<font>"foo"</font><font>).onErrorResume(e -> { e.printStackTrace(); <b>return</b> Mono.empty(); }).subscribe(); } } </font>
您可以使用任何一种onError*方法来注册“错误钩子”以将错误返回发布者。
#5:看着我
让我们看一下以下片段
<b>interface</b> Service { Mono<String> create(String s); Mono<Void> update(String s); } <b>class</b> Foo { <b>private</b> <b>final</b> Service service; Mono<Integer> problem() { <b>return</b> service.create(<font>"foo"</font><font>).map(foo -> { service.update(foo).subscribe(); <b>return</b> foo.length(); }); } } </font>
在这里实现的是订阅更新和变换结果为Mono<Integer>。因此,我们使用map操作来获取字符串foo的长度。
尽管update将在某个时刻执行,但我们还是不会传播内部订阅,类似于陷阱3。内部订阅已分离,我们无法对其进行控制。
更好的方法是使用flatMap,然后使用thenReturn转换结果。
<b>interface</b> Service { Mono<String> create(String s); Mono<Void> update(String s); } <b>class</b> Foo { <b>private</b> <b>final</b> Service service; Mono<Integer> problem() { <b>return</b> service.create(<font>"foo"</font><font>).flatMap(foo -> service.update(foo).thenReturn(foo.length()) ); } } </font>
您是否开始小心翼翼使用订阅呢?大多数时候不是。有一些潜在的用例需要小心:
#6:不要指望它……
下一个可能会很棘手:
<b>interface</b> Service { Flux<Integer> findAll(); } <b>class</b> Foo { <b>private</b> <b>final</b> Service service; Flux<Integer> problem() { AtomicInteger count = <b>new</b> AtomicInteger(); <b>return</b> service.findAll() .doOnNext(count::addAndGet) .doOnComplete(() -> System.out.println(<font>"Sum: "</font><font> + count.get())); } } </font>
在这里,我们只是使用一个doOnNext运算符来累加流过的所有数字,并使用doOnComplete运算符在完成流时打印出结果总和。我们使用一个AtomicInteger来保证线程安全的增量。
problem().block()一次甚至多次调用时,这似乎可行。但是,如果我们problem()多次订阅结果,则结果将完全不同。此外,如果由于某种原因而使下游订阅续订,则该计数也将关闭。发生这种情况的原因是,我们正在向发布商之外收集状态。在所有订户之间存在共享的可变状态,这是一种非常难闻的气味。
正确的方法是 将状态的初始化推迟到发布者,例如通过将状态也包装在发布者中Mono。这样,每个订户都会拥有自己的数量。
#7:关闭,但没有雪茄
下一个也有类似的问题。你能发现吗?
<b>abstract</b> <b>class</b> UploadService { <b>protected</b> Mono<Void> doUpload(InputStream in); Mono<Void> upload(InputStream in) { doUpload(in).doFinally(x -> in.close()); } } <b>class</b> Foo { <b>private</b> <b>final</b> UploadService service; Mono<Void> problem(byte[] data) { <b>return</b> service.upload(<b>new</b> ByteArrayInputStream(data)) .retry(5); } }
在这里,我们尝试上载输入流,在UploadService使用完doFinally运算符后将其关闭。为了确保我们成功完成上传,我们希望使用retry操作员对任何失败重试五次。
当重试开始时,我们将注意到输入流已经关闭,并且所有重试将用来耗尽IOException。与前面的情况类似,我们在此处处理发布者外部的状态,即输入流。我们正在关闭它,通过使用doFinally运算符来更改其状态。这是我们应避免的副作用。
解决方案再次是将输入流的创建推迟到发布者。
#8:不给糖就捣蛋
以下问题可能是十个问题中最微妙的一个,但仍然值得一提:
<b>interface</b> Service { Flux<String> findAll(); Mono<Void> operation(String s); } <b>class</b> Foo { <b>private</b> <b>final</b> Service service; Flux<Void> problem() { <b>return</b> service.findAll() .flatMap(service::operation); } }
乍看之下,我们在这里所做的一切都是正确的。我们将使用flatMap组合两个发布者。
这段代码可能会起作用,但是值得了解幕后发生的事情。虽然flatMap看起来像一个简单的转换器,类似于API之类的集合,但在Reactor中,它是一个异步运算符。内部发布者将被异步订阅。这导致不受控制的并行性。根据我们Flux<String> findAll()将发出的元素数量,我们可能会启动100个并发子流。这可能不是您想要的,我认为Reactor API应该对此更加明确,如果不禁止这样做的话。
例如,使用Akka Streams甚至不可能。相应的运算符被显式调用mapAsync,它在此处清楚地指示您正在处理并发执行。此外,它严格要求您通过传递并行度整数参数来明确限制并发性。
幸运的是对于flatMapReactor中有一个重载,您也可以配置并行性。
<b>interface</b> Service { Flux<String> findAll(); Mono<Void> operation(String s); } <b>class</b> Foo { <b>private</b> <b>final</b> Service service; Flux<Void> problem() { <b>return</b> service.findAll() .flatMap(service::operation, 4); <font><i>// parallelism=4</i></font><font> } } </font>
通常,您甚至根本不需要并行处理。如果只想同步组成两个流,则可以使用concatMap运算符。
<b>interface</b> Service { Flux<String> findAll(); Mono<Void> operation(String s); } <b>class</b> Foo { <b>private</b> <b>final</b> Service service; Flux<Void> problem() { <b>return</b> service.findAll() .concatMap(service::operation); } }
#9:我的流泄漏了
差不多好了。在编写反应式代码时,有时必须与非反应式代码集成。这是以下片段的内容。
<b>interface</b> Service { Flux<String> findAll(); } <b>class</b> Foo { <b>private</b> <b>final</b> Service service; Iterable<String> problem() { <b>return</b> service.findAll().toIterable(); } }
这段代码几乎太简单了。我们正在处理Flux <String>,但是我们不希望我们的API公开这个Flux反应类型。因此,我们正在使用内置toIterable方法将Flux流转换为Iterable<String>。
虽然这可能会产生预期的结果,但Iterable以这种方式将Reactor流转换为却是一种气味。Iterable不支持流关闭,因此发布者将永远不知道订阅者何时完成。坦白说,我不明白为什么toIterable它甚至是流API的一部分。我认为我们应该避免它!
替代方法是java.util.Stream,使用toStream方法转换为较新的API 。这确实支持整齐地关闭资源。
<b>interface</b> Service { Flux<String> findAll(); Mono<String> lookup(String s); } <b>class</b> Foo { <b>private</b> <b>final</b> Service service; Stream<String> problem() { <b>return</b> service.findAll().toStream(); } }
#10:我不想结束
如果您走了这么远,恭喜!您可能不希望这样结束,如下面的代码片段所示:
<b>interface</b> Service { Flux<String> observe(); Mono<Void> save(String s); } <b>class</b> Foo { <b>private</b> <b>final</b> Service service; <b>void</b> longRunningProblem() { service.observe() .flatMap(service::save, 10) .subscribeOn(Schedulers.elastic()) .subscribe(); } }
在这里,我们一直希望观察一个流,并在流过每个元素时保存它们。这将是一个潜在的无休止的流,因我们不想阻塞主线程。因此,我们正在使用subscribeOn运算符来订阅Scheduler的弹性方法。Scheduler调度程序动态创建ExecutorService,基于工作程序并缓存线程池以供重用。最后,我们调用subscribe()以确保将执行流。
这里的问题是,通过保存创建的上游观察者或内部发布者中的任何失败都将导致流终止。我们缺少错误处理程序或重试机制。
可以
<b>interface</b> Service { Flux<String> observe(); Mono<Void> save(String s); } <b>class</b> Foo { <b>private</b> <b>final</b> Service service; <b>void</b> longRunningProblem() { service.observe() .flatMap(service::save, 10) .doOnTerminate(<b>this</b>::longRunningProblem) <font><i>// start over on terminate</i></font><font> .subscribeOn(Schedulers.elastic()) .retry() </font><font><i>// retry indefinitely</i></font><font> .subscribe(); } } </font>
结论
因此,经验教训。如果您可以从中学到一些东西,那就是以下几点
不要对其他发布者做任何假设
不要对其他订户做任何假设