上一篇文章,对Rx进行了梳理,大致了解了什么是Rx,并对RxJava的基本流程进行了跟踪,如忘记其中关键点,及时回顾。
传送门
从Rx的机制上来说,事件的持续进行运转于调用链之上,也就是说,调用链上的每一环节,承载了各自的任务,并对事件的最终完成或异常状态提供了对应出口。那分门别类的任务,是如何嵌入调用链中,并得以完成的呢? 此篇文章,将对此进行解析。
在解析之前,先对基本流程进行回顾:
Observable<Integer> ob = Observable.fromArray(new Integer[]{1,2,3,4,5,6}); // 数据 ob.filter(new Predicate<Integer>() { // 过滤 @Override public boolean test(Integer integer) throws Exception { return integer % 2 == 0; } }).map(new Function<Integer, String>() { // 数据转换 @Override public String apply(Integer integer) throws Exception { return integer.toString(); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { // 响应 Log.d(TAG, "onNext: " + s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { // 响应 Log.d(TAG, "onComplete: "); } }); 复制代码
从上游向下游发射了6个数据,过滤掉不满足过滤条件 integer % 2 == 0 的数据,并将过滤后的数据转换为String类型,下游最终拿到3个数据,进行了4次响应。 这一调用链中,进行了两个额外任务——过滤、转换,但却能保证最终到达下游,那,如何完成?
从基本流程看,第一步,拿到Observable。从每次调用的返回看,包括Observable.fromArray(),Observable.filter(),Observable.map()均会拿到Observable,但,Observable会有不同。分别进行跟踪,在当前情况下,拿到的Observable依次如下:
// Observable.filter() public final Observable<T> filter(Predicate<? super T> predicate) { ObjectHelper.requireNonNull(predicate, "predicate is null"); return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate)); } // Observable.map() public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); } // Observable.fromArray() public ObservableFromArray(T[] array) { this.array = array; } Observable.filter() public ObservableFilter(ObservableSource<T> source, Predicate<? super T> predicate) { super(source); this.predicate = predicate; } Observable.map() public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) { super(source); this.function = function; } 复制代码
除了fromArray()外,均接收一个ObservableSource参数,并返回一个Observable。而接收到的ObservableSource,就是调用当前接口的Observable , 显然,Observable被装饰了,并且是层层装饰。 注意到,在Rx提供的类似fromArray(),比如前文的Observable.create()的一系列方法,接收参数并不遵守这一规则。原因是,这一系列接口所提供的Observable所处位置比较特别,这类Observable处于事件的的发源地,作为接收数据的源头,而数据的来源方式,千差万别,因此这类Observable的构造方式也大不相同。
这里需插播一个信息,即Observable.map()与Observable.filter()这一类操作所拿到的Observable被标示为AbstractObservableWithUpstream,取map()来看,如下
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> 复制代码
AbstractObservableWithUpstream的作用在于,指明此类Observable,将会对所包装的Observable进行额外处理。 其实,Observer也会有相应的行为,下面再谈。
回到当前,说到经过一系列的调用之后,形成了一条调用链,而拿到的Observable是被层层包装的,因此,实际拿到的是一条包装链,如下图:
案例中,到Observable.map()为止,上游链条建立完毕,由外至内为ObservableMap -> ObservableFilter -> ObservableFromArray。紧接着,与观察者签订。
与上游对应的,其实在签订之后,下游也会形成链条的。 再次回忆之前所说的基本流程:
此刻,拿到了最外层为ObservableMap的上游链条(最后调用为Observable.map()),因此,实际与案例里定义的观察者签订的Observable为ObservableMap。在第一篇讲解中说过,subscribe()会将Observer交给subscribeActual(),由subscribeActual()来执行具体的逻辑。
// map()拿到的ObservableMap @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); } // filter()拿到的ObservableFilter @Override public void subscribeActual(Observer<? super T> s) { source.subscribe(new FilterObserver<T>(s, predicate)); } // fromArray()拿到的ObservableFromArray @Override public void subscribeActual(Observer<? super T> s) { FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array); s.onSubscribe(d); if (d.fusionMode) { return; } d.run(); } 复制代码
可以看到,从最外层的Observable开始,不断地执行subscribe() -> subscribeActual() -> subscribe() -> subscribeActual() ... 直到达到最内层Observable的 subscribeActual()。 期间通过subscribe()交接的Observer也经历了对应的包装变化。Observer的包装次序为 自定义的Observer <- MapObserver <- FilterObserver 。 除最内层的Observer外,其他的Observer均以另一个Observer为构造参数,以便装饰,可通过跟踪任意Observer源码看出,此不贴出。
当前,下游也形成了一条调用链。上下游的调用链如图:
在到达最内层的Observable之后,Observer链构建完毕,并且最内的Observable将Observer链的最外层以Disposable形式交给API的使用者, 以上执行此承操作的代码为 s.onSubscribe(d)。
以上上下游链条的构建过程需要仔细琢磨,很重要。
到此处为止,RxJava的调用链式构造完毕了的。那么,为什么要这样做呢?
一句话回答:“处理差异性需求。”
在Rx中,创建、过滤、变换、结合、辅助操作等,提供的以操作符形式的让得以以链条形式完成一系列工作的行为,均是为了处理差异性需求,在本文案例中表现为fromArray(),filter(),map()。而对于种种的差异性需求来说,先以上游或下游为大环境,再以其中某一节点为具体场景进行处理。因此需要分别在上下游分别建立相应的调用链,并构建出场景明确的链条节点,以此达到Rx完成需求的目的。
在有了双链并确定了节点场景之后,处理差异性需求就简单了,仅需在当前场景处理响应事件之后,让事件继续流动向下一节点皆可。如图
当事件或数据来到当前节点时,一般会遵循如图顺序流动。
回到案例。当前情况,已在上下游构建好相应的调用链,而在最内的Observable里,发射了数据
// ObservableFromArray @Override public void subscribeActual(Observer<? super T> s) { FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array); s.onSubscribe(d); if (d.fusionMode) { return; } // 发射数据 d.run(); } // FromArrayDisposable void run() { T[] a = array; // 要发射的数据 int n = a.length; for (int i = 0; i < n && !isDisposed(); i++) { T value = a[I]; if (value == null) { actual.onError(new NullPointerException("The " + i + "th element is null")); return; } // actual为最外的Observer,为FilterObserver actual.onNext(value); } if (!isDisposed()) { actual.onComplete(); } } 复制代码
在数据开始推送后,下游链的每个节点,依次接收到数据事件的到来,并进行相应的处理。根据之前的分析,当前案例的下游链为自定义的FilterObserver -> MapObserver -> 自定义的Observer 。 对应代码如下
// FilterObserver @Override public void onNext(T t) { if (sourceMode == NONE) { ...... if (b) { // 将满足过滤条件的数据发送下一节点,为MapObserver actual.onNext(t); } } else { actual.onNext(null); } // MapObserver @Override public void onNext(T t) { ...... try { // 将数据t进行转换 v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } // 将数据v发送给下一节点,为自定义的Observer actual.onNext(v); } 复制代码
当事件到达调用链的某一节点,此节点将进行对应的处理,并根据需求决定事件应如何收发,而在最终的节点里,将会收到经历了所有变迁的事件。
注意到,在Observable.fromArray()、Observable.filter()、Observable.map()等操作符里,需要携带对应的参数,如Function、Predicate等。此类参数的作用为,携带链上节点所需信息,协助构建节点或协助处理事件,但不为本文重点,故不提及。
Rx中,为了在链式表达中完整一系列事件需求,分别在上下游构建了对应的调用链。
事件到来时,节点将会进行响应,并根据自身特点在合适的时机将事件推送给下一节点。通过上下游调用链的运行机制,能让Rx得以以链条不断裂的前提下,完成复杂的事件交互,将事件的流程主干铺陈出来,剥离异常与分支流程。而Rx可插拔的运作方式也体现在节点的删减中。
顺带提一句,节点拥有处理事件然后推送给下一节点的过程,而其中经历的时间与具体的处理是变化无穷,因此,节点是多维度的,仔细体会,自行想象。