转载

RxJava2 源码二:链式的秘密

上一篇文章,对Rx进行了梳理,大致了解了什么是Rx,并对RxJava的基本流程进行了跟踪,如忘记其中关键点,及时回顾。

传送门

关于链式

从Rx的机制上来说,事件的持续进行运转于调用链之上,也就是说,调用链上的每一环节,承载了各自的任务,并对事件的最终完成或异常状态提供了对应出口。那分门别类的任务,是如何嵌入调用链中,并得以完成的呢? 此篇文章,将对此进行解析。

链式

回顾

在解析之前,先对基本流程进行回顾:

  1. 拿到Observable
  2. Observable.subscribe()与观察者签订
  3. subscribeActual()获得执行时机,执行具体逻辑,通知观察者已签订
  4. 数据或事件到来,观察者进行响应

案例

Observable<Integer> ob = Observable.fromArray(new Integer[]{1,2,3,4,5,6});  // 数据
ob.filter(new Predicate<Integer>() { // 过滤
    @Override
    public boolean test(Integer integer) throws Exception {
        return integer % 2 == 0;
    }
}).map(new Function<Integer, String>() { // 数据转换
    @Override
    public String apply(Integer integer) throws Exception {
        return integer.toString();
    }
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        
    }

    @Override
    public void onNext(String s) { // 响应
        Log.d(TAG, "onNext: " + s);
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onComplete() { // 响应
        Log.d(TAG, "onComplete: ");
    }
});
复制代码

从上游向下游发射了6个数据,过滤掉不满足过滤条件 integer % 2 == 0 的数据,并将过滤后的数据转换为String类型,下游最终拿到3个数据,进行了4次响应。 这一调用链中,进行了两个额外任务——过滤、转换,但却能保证最终到达下游,那,如何完成?

上游的构建

从基本流程看,第一步,拿到Observable。从每次调用的返回看,包括Observable.fromArray(),Observable.filter(),Observable.map()均会拿到Observable,但,Observable会有不同。分别进行跟踪,在当前情况下,拿到的Observable依次如下:

// Observable.filter() 
public final Observable<T> filter(Predicate<? super T> predicate) {
    ObjectHelper.requireNonNull(predicate, "predicate is null");
    return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
}

// Observable.map()
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

// Observable.fromArray()
public ObservableFromArray(T[] array) {
    this.array = array;
}

Observable.filter()    
public ObservableFilter(ObservableSource<T> source, Predicate<? super T> predicate) {
    super(source);
    this.predicate = predicate;
}

Observable.map()
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
    super(source);
    this.function = function;
}
复制代码

除了fromArray()外,均接收一个ObservableSource参数,并返回一个Observable。而接收到的ObservableSource,就是调用当前接口的Observable , 显然,Observable被装饰了,并且是层层装饰。 注意到,在Rx提供的类似fromArray(),比如前文的Observable.create()的一系列方法,接收参数并不遵守这一规则。原因是,这一系列接口所提供的Observable所处位置比较特别,这类Observable处于事件的的发源地,作为接收数据的源头,而数据的来源方式,千差万别,因此这类Observable的构造方式也大不相同。

这里需插播一个信息,即Observable.map()与Observable.filter()这一类操作所拿到的Observable被标示为AbstractObservableWithUpstream,取map()来看,如下

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U>

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T>
复制代码

AbstractObservableWithUpstream的作用在于,指明此类Observable,将会对所包装的Observable进行额外处理。 其实,Observer也会有相应的行为,下面再谈。

回到当前,说到经过一系列的调用之后,形成了一条调用链,而拿到的Observable是被层层包装的,因此,实际拿到的是一条包装链,如下图:

RxJava2 源码二:链式的秘密

案例中,到Observable.map()为止,上游链条建立完毕,由外至内为ObservableMap -> ObservableFilter -> ObservableFromArray。紧接着,与观察者签订。

下游的构建

与上游对应的,其实在签订之后,下游也会形成链条的。 再次回忆之前所说的基本流程:

  1. 拿到Observable
  2. Observable.subscribe()与观察者签订
  3. subscribeActual()获得执行时机,执行具体逻辑,通知观察者已签订
  4. 数据或事件到来,观察者进行响应

此刻,拿到了最外层为ObservableMap的上游链条(最后调用为Observable.map()),因此,实际与案例里定义的观察者签订的Observable为ObservableMap。在第一篇讲解中说过,subscribe()会将Observer交给subscribeActual(),由subscribeActual()来执行具体的逻辑。

// map()拿到的ObservableMap
@Override
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}

// filter()拿到的ObservableFilter
@Override
public void subscribeActual(Observer<? super T> s) {
    source.subscribe(new FilterObserver<T>(s, predicate));
}

// fromArray()拿到的ObservableFromArray
@Override
public void subscribeActual(Observer<? super T> s) {
    FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
    s.onSubscribe(d);
    if (d.fusionMode) {
        return;
    }
    d.run();
}
复制代码

可以看到,从最外层的Observable开始,不断地执行subscribe() -> subscribeActual() -> subscribe() -> subscribeActual() ... 直到达到最内层Observable的 subscribeActual()。 期间通过subscribe()交接的Observer也经历了对应的包装变化。Observer的包装次序为 自定义的Observer <- MapObserver <- FilterObserver 。 除最内层的Observer外,其他的Observer均以另一个Observer为构造参数,以便装饰,可通过跟踪任意Observer源码看出,此不贴出。

当前,下游也形成了一条调用链。上下游的调用链如图:

RxJava2 源码二:链式的秘密

在到达最内层的Observable之后,Observer链构建完毕,并且最内的Observable将Observer链的最外层以Disposable形式交给API的使用者, 以上执行此承操作的代码为 s.onSubscribe(d)。

以上上下游链条的构建过程需要仔细琢磨,很重要。

事件的处理

到此处为止,RxJava的调用链式构造完毕了的。那么,为什么要这样做呢?

一句话回答:“处理差异性需求。”

在Rx中,创建、过滤、变换、结合、辅助操作等,提供的以操作符形式的让得以以链条形式完成一系列工作的行为,均是为了处理差异性需求,在本文案例中表现为fromArray(),filter(),map()。而对于种种的差异性需求来说,先以上游或下游为大环境,再以其中某一节点为具体场景进行处理。因此需要分别在上下游分别建立相应的调用链,并构建出场景明确的链条节点,以此达到Rx完成需求的目的。

在有了双链并确定了节点场景之后,处理差异性需求就简单了,仅需在当前场景处理响应事件之后,让事件继续流动向下一节点皆可。如图

RxJava2 源码二:链式的秘密

当事件或数据来到当前节点时,一般会遵循如图顺序流动。

回到案例。当前情况,已在上下游构建好相应的调用链,而在最内的Observable里,发射了数据

// ObservableFromArray
@Override
public void subscribeActual(Observer<? super T> s) {
    FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);

    s.onSubscribe(d);

    if (d.fusionMode) {
        return;
    }
    // 发射数据
    d.run();
}

// FromArrayDisposable
void run() {
    T[] a = array; // 要发射的数据
    int n = a.length;

    for (int i = 0; i < n && !isDisposed(); i++) {
        T value = a[I];
        if (value == null) {
            actual.onError(new NullPointerException("The " + i + "th element is null"));
            return;
        }
        // actual为最外的Observer,为FilterObserver
        actual.onNext(value);
    }
    if (!isDisposed()) {
        actual.onComplete();
    }
}
复制代码

在数据开始推送后,下游链的每个节点,依次接收到数据事件的到来,并进行相应的处理。根据之前的分析,当前案例的下游链为自定义的FilterObserver -> MapObserver -> 自定义的Observer 。 对应代码如下

// FilterObserver
@Override
public void onNext(T t) {
    if (sourceMode == NONE) {
       ......
        if (b) {
            // 将满足过滤条件的数据发送下一节点,为MapObserver
            actual.onNext(t);
        }
    } else {
        actual.onNext(null);
}

// MapObserver
@Override
public void onNext(T t) {
    ......
    try {
        // 将数据t进行转换
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
        return;
    }
    // 将数据v发送给下一节点,为自定义的Observer
    actual.onNext(v);
}
复制代码

当事件到达调用链的某一节点,此节点将进行对应的处理,并根据需求决定事件应如何收发,而在最终的节点里,将会收到经历了所有变迁的事件。

注意到,在Observable.fromArray()、Observable.filter()、Observable.map()等操作符里,需要携带对应的参数,如Function、Predicate等。此类参数的作用为,携带链上节点所需信息,协助构建节点或协助处理事件,但不为本文重点,故不提及。

总结

Rx中,为了在链式表达中完整一系列事件需求,分别在上下游构建了对应的调用链。

  • 上游 : 对于上游来说,链的形成通过相应操作符的操作构建
  • 下游 : 对于下游来说,链通过Observable.subscribe()与Observable.subscribeActual()构建。

事件到来时,节点将会进行响应,并根据自身特点在合适的时机将事件推送给下一节点。通过上下游调用链的运行机制,能让Rx得以以链条不断裂的前提下,完成复杂的事件交互,将事件的流程主干铺陈出来,剥离异常与分支流程。而Rx可插拔的运作方式也体现在节点的删减中。

顺带提一句,节点拥有处理事件然后推送给下一节点的过程,而其中经历的时间与具体的处理是变化无穷,因此,节点是多维度的,仔细体会,自行想象。

原文  https://juejin.im/post/5eb3a6d0f265da7b9450bc2d
正文到此结束
Loading...