上一篇文章 Rxjava2源码解析:最简链式解析 里我们讲到Rxjava2 从创建一个事件到事件被观察的过程原理,这篇文章我们讲Rxjava2中链式调用的原理。本文不讲用法,仍然需要读者熟悉Rxjava基本的用法。
Rxjava是解决异步问题的,它的链式调用让代码看起来非常流畅优雅。现在我们带上线程切换以及链式调用来看看。下面代码是示例:
Observable .create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("a"); } }) .subscribeOn(Schedulers.io()) .unsubscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .map(new Function<String, Integer>() { @Override public Integer apply(String s) throws Exception { return 1; } }) .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Object o) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
我们创建一个事件(观察者),想输出一个字符串 "a"。这个事件发生在IO线程,结束也在IO线程,事件的状态回调发生在主线程。示例的用法大家应该都能懂,我们主要讨论这个链式的原理流程。为什么这么说呢?因为这个链式跟一般的链式不太一样。
这个方法我们之前看过,返回一个ObservableCreate对象,ObservableCreate继承自Observable,里面的source存着我们创建的ObservableOnSubscribe匿名对象。
这是Obserbvable的方法,先看源码:
public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); } public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; }
代码结构跟create的差不多,在钩子函数里直接返回我们创建的对象 ObservableSubscribeOn<T>(this, scheduler)
,并传入当前的Observable也就是ObservableCreate对象。所以我们看一下这个类的代码:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } }
这个类继承自AbstractObservableWithUpstream类,构造函数的参数是ObservableSource,所以这里我们需要介绍两个类:
- ObservableSource</br>
ObservableSource是一个接口,所有的Observable都实现了这个接口,它里面只有:
void subscribe(@NonNull Observer<? super T> observer);
这一个方法。很明显这个方法是为了让Observer订阅Observable的,或者说为了Observable把事件状态传递给Observer的。
- AbstractObservableWithUpstream</br>
这个类继承了Observbable
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> { protected final ObservableSource<T> source; AbstractObservableWithUpstream(ObservableSource<T> source) { this.source = source; } }
从源码可以看出这个类有变量source,它在构造函数里传入值,存储ObservableSource对象。
所以当我们调用Observable的subscribeOn方法的时候会创建一个ObservableSubscribeOn对象,并用变量source存储当前的Observable对象,然后返回ObservableSubscribeOn对象。
public final Observable<T> unsubscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableUnsubscribeOn<T>(this, scheduler)); } public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; }
这个方法跟上面的方法是一个模子刻的。所以我们主要看ObservableUnsubscribeOn这个类就好。
public final class ObservableUnsubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableUnsubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } }
这个类跟刚才的ObservableSubscribeOn也几乎一模一样,继承自AbstractObservableWithUpstream类,使用source存了当前Observable对象。而此时的Observbvable对象是上一个方法创建的对象,也就是ObservableSubscribeOn对象。
由于这些方法的内容基本一样我就省略代码的解释。
observeOn方法是创建了ObservableObserveOn对象,并保存上一个方法创建的Observable
map方法是创建ObservableMap对象,并保存上一个方法创建的Observable
所以总结一下可知:链式调用这些方法的时候,都会创建一个相关的对象,然后用变量source存储上一个方法创建的Observable子类对象。
上次文章讲到,这个方法内部会调用一个抽象方法,subscribeActual方法,作为真实的订阅。
而这个方法的逻辑需要看子类如何实现。
而第一次调用该这个subscribe方法的对象是ObservableMap对象。所以我们看看它内部如何实现的。
ObservableMap的subscribeActual方法实现:
public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); }
内部调用了source的subscribe方法。此时ObservableMap对象里存的source是上一个方法创建的observable,也就是ObservableObserveOn对象。所以我们要看看ObservableObserveOn是如何实现subscribeActual方法的:
protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
同理他最终也是调用了上一个Observable的subscribe。
于是我们知道当我们调用subscribe方法的时候,会递归式的调用source存储的上一个方法创建的Observable的subscribeActual方法,一直到ObsservableCreate的subscribeActual的方法,把事件状态传递给观察者。这个上一篇文章已经讲过。
我们常见的普通的链式调用一般都会返回当前同一个对象。和普通的链式调用不同当我们调用Rxjava2的链式调用时,他们会返回自己对应的Observable子类对象,每个对象都不一样,然后在subscribeActual方法中递归式的调用每个对象的subscribeActual方法,完成一个链式的调用。