在进行数据流处理过程中,需要一个高效苗条的流处理组件,比如对输入流能进行分组(窗口),能进行流量控制(Back Pressure - 背压),这也就涉及到响应式编程,流处理框架。这方面如果直接基于 Akka actor 来构建 Akka ActorSystem 也是比较复杂,依赖的组件也不少。还有构筑在 Akka actor 之上的 Akka Streams,再往上的 Flink Streaming,它们都有像滑动,滚动窗口的概念,但是依赖更不得了。一个基本的 Flink Streaming 的项目会依赖到 45 M 以上的第三方组件,如果用它来写一个数据流处理的共享组件,那真是要命。Spring 5 也开始带上了自己的 Reactive-Streams 实现 Spring Reactor, 想要把它从 Spring 中单独抽离出也非易事。
Flink Streaming 组件依赖:org.apache.fling:flink-streaming-java_2.12:1.80, 会依赖于其他诸如 akka-stream, akka-actor, flink-core, flink-clients, scala-library 等非常多的东西
而另一个著名的响应式框架 RxJava 2 就清爽多了,完全没有第三方依赖,要说有也就是定义了四个接口的 reactive-streams(2 KB 大小),就自身那个 rxjava-2.2.9.jar 包只有 2.3 M,这才叫轻量级。因为它设计来是能被应用于 Android 客户端应用的,Andriod 上的 rxandriod-1.2.1.aar 只有 9 K。所以 RxJava 2.x 太适合用来写一些小的共享组件了。
说了那么多,RxJava 是什么?直接网上抄一句定义:RxJava是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。它的并发不一定需要了解它的实现细节,RxJava 也支持像 Flink, Akka Streams 那样的窗口概念。RxJava 2.x 符合了标准的响应式流(Reactive-Streams) 规范,主要应用了观察者模式,编程中围绕着的概念有 Observable, Observer, Subscriber , Subject(既是 Observable 又是 Observer), Flowable, Publisher , Processor (既是 Subscriber 又是 Publisher) 等,其中标暗红的对象是来自于 Reactive-Streams 定义的概念。响应的编程还需了解类似于 Iterator 的 onNext, onError, onComplete 这三个基本事件。还有响应式编程通常会用的 Producer/Consumer, Source/Sink, Publisher/Subscriber,传递的数据元素通常称作 Subject。
下面用几段基本代码来体验 RxJava 2 来做什么
任何语言的 Hello World 代码都毫无意义的,RxJava 2 的也不例外,像下面这行
Observable.just("Hello World").subscribe(System.out::println);
效果和 System.out.println("Hello World")
一样,说明不了什么问题。再进一步观察一个集合
Observable.fromIterable(Arrays.asList("one", "two", "three")) .subscribe(s -> System.out.println("Hello " + s));
稍好一些,像是 forEach() 操作
来一个更多一点功能的代码,用 onNext, onError, onComplete 来控制数据
Observable.<String>create(emitter -> { emitter.onNext("Monday"); emitter.onNext("Tuesday"); // emitter.onError(new RuntimeException("Something wrong")); #1 emitter.onNext("Wednesday"); emitter.onComplete(); }).subscribe( day -> System.out.println("Day: " + day), //onNext t -> System.err.println(t.getMessage()), //onError () -> System.out.println("Done") //onComplete ); System.out.println("Program exit");
以上用 Java 8 Lambda 语法书写的 subscribe 中各个 onNext, onError, onComplete 函数参数,
如果没有 #1
行代码为被注释状态时的输出为
Day: Monday Day: Tuesday Day: Wednesday Done Program exit
如果 #1
代码启用的话,输出为
Day: Monday Day: Tuesday Something wrong Program exit
上面整个代码也是一个同步执行的过程,因为到目前为止还未引用新的线程,所以全部的输出操作都是在主线程上完成的,这就是为什么数据遍历完之后才输出 Program exit
。
如果只是单线程操作,那要 RxJava 的优势在哪儿,RxJava 支持并发,而且还把并发操作透明化,也就是我们不需要太了解它的线程模型的实现。但作为一个希望通过阅读源代码来更准备理解一个框架的人来说,透明化的东西也要对它的线程模型看个究竟。
首先来看一下以上代码各部分是由什么线程来执行的
Observable.<String>create(emitter -> { log("Producer"); emitter.onNext("Monday"); emitter.onComplete(); }).subscribe( day -> { log("Consumer-" + day); } ); log("Program exit");
log() 方法输出当前线程名与消息,实现代码为
private static void log(String msg) { System.out.println(Thread.currentThread() + ": " + msg); }
执行后输出为
Thread[main,5,main]: Producer Thread[main,5,main]: Consumer-Monday Thread[main,5,main]: Program exit
加上 observeOn(Scheduler)
, 指定一个观察者在哪个调度器上观察这个Observable
Observable.<String>create(emitter -> { log("Producer"); emitter.onNext("Monday"); emitter.onComplete(); }).observeOn(Schedulers.newThread()) //只有消息消费用这个线程(池), 生产者仍然是主线程 .subscribe( day -> { Thread.sleep(200); log("Consumer-" + day); } ); log("Program exit"); Thread.sleep(1000);
第 5 行加了 .observeOn(Schedulers.newThread())
让观察者在一个新的线程上去。Schedulers 中出来的各种 Scheduler 这里暂不展开,只要知道会在新的线程上去执行某件事情。
并且消费消息时加了一个延时,最后加上等待 1 秒钟,以确保主线程退出前,消息还被全部消费完成。看下现在输出为
Thread[main,5,main]: Producer Thread[main,5,main]: Program exit Thread[RxNewThreadScheduler-1,5,main]: Consumer-Monday
我们看到消息仍然是由主线程放进去的,消息放完后立即退到主线程上去了,消费消息在 observeOn()
指定的线程上。
再试一下 subscribeOn(Scheduler)
, 指定Observable自身在哪个调度器上执行
Observable.<String>create(emitter -> { log("Producer"); emitter.onNext("Monday"); emitter.onComplete(); }).subscribeOn(Schedulers.newThread()) //消息生产与消费都会有这个线程(池) .subscribe( day -> { Thread.sleep(200); log("Consumer-" + day); } ); log("Program exit"); Thread.sleep(1000);
直接看输出来理解
Thread[main,5,main]: Program exit Thread[RxNewThreadScheduler-1,5,main]: Producer Thread[RxNewThreadScheduler-1,5,main]: Consumer-Monday
我们看到只用 subscribeOn(Scheduler)
时,消息的产生与消费都在它指定的线程池上。
observerOn() 和 subscribeOn() 双管齐下
Observable.<String>create(emitter -> { log("Producer"); emitter.onNext("Monday"); emitter.onComplete(); }).subscribeOn(Schedulers.newThread()) //消息生产者用这个线程(池) .observeOn(Schedulers.io()) //消息消费者用这个线程(池) .subscribe( day -> { Thread.sleep(200); log("Consumer-" + day); } ); log("Program exit"); Thread.sleep(1000);
记住前面用 Schedulers.newThread()
的线程名称类似 RxNewThreadScheduler-1,5,main
, 现在来看新的输出
Thread[main,5,main]: Program exit Thread[RxNewThreadScheduler-1,5,main]: Producer Thread[RxCachedThreadScheduler-1,5,main]: Consumer-Monday
新的线程名 RxCachedThreadScheduler-1,5,main
就是 Schedulers.io()
所对应的,所以由上面的输出表明
subscribeOn() 只有第一次调用有效,因为 Observable 就一个,observeOn() 可以调用多次,每次调用都会影响到后续的 map, filter, subscribe 等操作。
Schedulers 中的工厂方法可以创建出不同类型的线程池,简单说明如下
RxJava 在 Java 7 之前就有了,所以它不依赖于 JDK 7 的 ForkJoinPool,解决了 JDK 8 的 CompletableFuture 之前的任务间的依赖问题。可以大概对比一下 RxJava 与 CompletableFuture 两种写法,不要太纠缠于细节,以下代码不是完全对等
RxJava 代码
Observable.just(request1, request2) .observeOn(Schedulers.io()) .map(Request::get) .subscribe(System.out::println);
CompletableFuture 代码
ExecutorService threadPool = Executors.newFixedThreadPool(10); Stream.of(request1, request2) .map(request -> CompletableFuture.supplyAsync(request::get, threadPool)) .forEach(completableFuture -> log(completableFuture.join()) );
以上两段代码并非用以说明哪种写法的好坏,只是纯粹的提供两种代码风格
前方的代码是加入消息消费前所有消息都准备好了,这对处理一个已经列表有用,更多的时候是先设置好消费者后,消息的产生是连续不断的。
PublishProcessor<String> restProcessor = PublishProcessor.create(); Observable.fromPublisher(restProcessor).subscribe(System.out::println); restProcessor.offer("Monday"); restProcessor.offer("Tuesday"); restProcessor.offer("Wednesday"); restProcessor.onComplete();
本文的内容比较杂,其余更多的话题有 RxJava 的背压和窗口的支持等。