自此文章起,逐层迈入RxJava2源码世界,探索Rx思想。 此前,需要对Rx有简单了,起码曾使用过。对于必要的证明外,不贴出具体案例。文章分成若干篇讲解,确保理解清晰,结构明了,此为开篇。
本篇文章将说明:
Rx的出现,是为了以可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序(引自官方文档)。简要得说,Rx的编程思想在以于,能以保证调用链不断裂的前提下,完成一系列事件的处理。一者,主流程、分支流程以及异常流程的处理均处于链上,能让使用者将目光聚焦于主要事件流程,即主流程。而分支流程、异常流程的处理同样包含在调用链中。这样做的好处是,当异常出现时、当需求更变时、当逻辑复杂时,得以快速定位场景,特别是涉及到异步处理的场景。二者,链上所有的事物均是可被观察的对象,一个可被观察的对象,可被指定的观察者捕捉到,进而进行响应,如此,各司其职,各自处理份内事务,共同协作。
对于上述,用一张图表示如下:
Rx致力于用一条调用链解决上图中所表达的流程问题,即在一条线性的流中,将目光集中于ABCD这一主流程并到达正常出口,同时,包含了异常流程、分支流程的处理以及所达到的出口。需要注意的是,在ABCD任意节点都可能出现异常需要直接到达异常出口,或者出现分支情况需要处理并回归到正常出口,那么这之间的承接、协作、中断都需要制定良好的规则与利弊权衡来保证链条的正常运转与不断裂。在大部分的场景中,Rx做到了,而且很优雅。
先说第一点Rx无偏见。Rx对于并发性或异步性无偏好,对它来说,所有的交互方式均被当成异步情况来处理。交互是响应式的,是一个主动通知、反应的过程。在异步事件到来时,事件能被主动推送到目的地进行处理,而不是由特定的目标期盼异步事件的到来。
第二点,Rx可插拔。借用上一张图,ABCD完成各自的需求,进而将事件继续传递下去,某一天,C中的分支情况不会再出现了,将C拔出,流程变成了ABD,整体也还是正常运转的;或者说在B、C之间,需要做进一步的校验,增加了G,那么,将G插入BC间,同样保证整体的正常运转。这种可插拔的运作方式的好处在于,定位快、精准,第一时间到达案发现场。
第三点,逻辑清晰。逻辑清晰是能将一系列的事件骨干部分暴露出来,快速地了解事件正常地从发生到结束的过程的样貌,而每一节点具体情况的处理以及其他情况的处理,被剥离到其他地方候命。当场景变更时,直接定位目标节点进行处理即可。可见,二、三点,是相铺相成的。
对于使用者来说,以上是比较亲密的,职责也比较明了。Observable描述了将会发生的事件,Observer声明了感兴趣的事件并规划如何响应,Emitter则决定了事件如何发生。Disposable用来决裂Observable与Observer间的关系,使不再产生响应。
在Rx的机制里,有两种流在流动——事件流与数据流。
数据流描述的是数据从上游流入下游的过程(可简单将Observable视为上游,Observer视为下游,以便理解)。此过程中,数据将被进行各种处理,如下图:
事件流是指在数据流动的过程中,发生的额外事件,比如,线程切换、结合、过滤等(当然,其中的以数据为主角的事件也可以看是是数据流,两者有交集,不冲突)。如下图:
(上两图均借鉴官方文档弹珠图)
从以上两图看,数据自上游到达下游的过程中,经历了数据流的传递与事件流的变迁,最终下游将拿到一个合适产物并进行响应。期间,无论是何种具体的传递与变迁,都保证了整个链条的联系。换句话说,从起点到终点,经历一系列的有差异的变化过程(可指数据状态、事件状态、上下文环境等)。
以上即是对于Rx的简要介绍
不得不提的是,Rx本身非常庞大,一蹴而就掌握整个框架思想是很难的,须卜丝抽茧,层层递进。这里仅关注除却所有额外功能外的Rx,基本流程的样貌,权当开胃菜。
案例如下
Observable<Integer> ob = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onComplete(); } }); ob.subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " ); } @Override public void onError(Throwable e) { } @Override public void onComplete() { Log.d(TAG, "onComplete: "); } }); 复制代码
以上代码完成了观察者与被观察者的订阅关系,具体的事件描述与发生、具体响应均包含其中。发射器发射了两个数据,并发送完成信号,下游将会进行对应响应。
首当其冲,必先看Observable。对于每个Observable,存在以下关系
public abstract class Observable<T> implements ObservableSource<T> 复制代码
显而易见,ObservableSource规定了Observable所做之事。在ObservableSource中,仅声明一个函数。
void subscribe(@NonNull Observer<? super T> observer); 复制代码
subscribe()所做之事为,与观察者签订。Observable提供了默认的实现,除了检查以及捕捉各类空异常外,核心为将Observer交接给当前Observable,触发subscribeActual(observer)的执行。
protected abstract void subscribeActual(Observer<? super T> observer); 复制代码
subscribeActual()十分重要,它作为媒介将Observable与观察者连接。对于RxJava中大量的Observable,因职责不同,需进行不同的处理,而差异部分,以subscribeActural()为引子,层层引爆。
其实,subscribe()与subscsribeActual()是相互协作的,前者负责通知、交接,后者负责执行差异。但是在这篇文章中,暂不深入两者协作关系的始末,且聚焦基本流程。
对于观察者来说,是Rx提供的一套制止,以待目标事件到来时,以某种方式进行响应。而在Rx里,观察者的核心行为有四种:
onSubscribe()一般来说,触发时机在subscsribeActual()中,并携带来自Observable的Disposable交给观察者,并在其中做出响应。当前状态为,数据未到达,契约已签订。而Disposable是个重要角色,稍后提及。
onNext()新的事件或数据到达,进行响应。但是当onError()或onComplete()响应之后,onNext()不再对后续事件或数据进行响应。
onCompelete()此响应的触发,预示着整个流程的正常完结,与onError()互斥。
onError()遭遇错误时响应,是Rx提供的一般异常出口,在不对异常事件进行特殊处理时,异常均能到达此处(不包括Rx限时的不做出传递的异常)。换句话说,API的使用者,总能在此处接收到Rx链上的异常。那么多的异常,都到达这里,是否会出问题?答案是,可能会,因为所有的异常均以接收到Throwable为判断依据,难免有失偏颇。Rx为了保证调用链的不断裂,做出了一些牺牲,这也是其中一部分。幸运的是,对大部分场景来说,在这里处理异常,是足够的。当需要处理比较棘手比较特定的异常时,解决办法也是有的,后续文章再加以说明。注意,onError()与onCompelete()互斥。
综上,观察者对每一种可能的事件进行了对应的响应。当然,Observer的职责并不仅仅于此,当涉及到Rx机制所需时,还需要承担更多的责任,下篇文章再细说。
Disposable的存在重要且必要,为什么?一言蔽之,切断联系。也就是说,当Observable与观察者签订了之后,也就产生了联系,互动亦随之而来。但转念一想,观察者变心了呢,不想和Observable互动了呢,怎么办?Disposable的存在就是解决了这个问题,以他为媒介,替观察者告诉Observable:“你是个好人,但是我们不合适,再见”。在此之后,观察者不再对事件进行响应,很容易想到的场景比如内存泄露,不多言。
Disposable仅有两种核心行为:
说到发射器,我是拒绝的,因为在平日里,并没有用到多少发射器。发射器的出现,大多是因为需要提供一种方式,让API的使用者动态地向下游发射数据,数据的发送逻辑,由发射器控制,但最终通过Observer收放。
对于其他的场景来说,Observer自己处理了数据与事件的逻辑,且数据的产生并不依靠发射器,这些在下篇文章里会看到。
简单来说,发射器声明了一些与Observer类似的行为,以确保响应时机的一致性。理所当然的是,发射器将先进行响应,Observer才可能进行后续响应,没错,是可能,因为Emitter提前获知Disposable的切断联系的信息,不再将事件进行传递。
回到案例。案例里通过Observable.create()创建了Observable并与观察者进行了签订。在Rx中,调用链上的环节,都将进行必要的检查,如非空检查,进而拿到一个合适的Observable进行后续操作。当前情况是拿到的是ObservableCreate。
紧接着,ObservableCreate通过Observable.subscribe()与观察者进行签订,机制开始运转。
之前说过,在Observable与观察者连接之后,由subscribeActual()来执行具体逻辑来处理,如下
@Override protected void subscribeActual(Observer<? super T> observer) { // 发射器与观察者进行绑定 CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 观察者进行响应 observer.onSubscribe(parent); try { // 交接发射器 source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } 复制代码
上面代码中的source,是Observable.create()时传来的ObservableOnSubscribe. 当前的subscribeActual()里,发射器获知观察者的信息,然后观察者以onSubscribe()对签订事件进行了响应。最后,调用source.subscribe()将发射器传递到外部,由外部自行产生数据与事件,由此完成了签订过程。
对于拥有发射器的情况,将由发射器产生数据并进行推送。由前文所知,发射器声明的行为会与Observer保持一致,包括onNext()、onError()、onComplete()。在发射器发射后,相应的行为将会发射到观察者的相应响应位置,但在递交给观察者前,会对Disposable进行检查。当前发射器为ObservableCreate.CreateEmitter,下面列出onNext(),抛砖引玉,其余代码自行查看。
@Override public void onNext(T t) { // 不支持空数据 if (t == null) { // 转向异常出口 onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { // 状态满足,下游才进行响应 observer.onNext(t); } } 复制代码
以上,就完成了简单的从Observable的创建,观察者签订,发射器推送数据和事件,观察者进行响应的过程。小结如下:
流程如下图
目前来说,基本流程很简单,无非是观察者模式下的对事件的响应。但是,在下一篇文章,涉及到调用链的核心,需要不断对这一过程进行理解。