众所周知RxJava有许多优点比如强大的链式调用,方便的线程调度,但是我对其原理还是了解的太少了,因此打算阅读下源码,先从一个最基本的例子开始
这个例子只是为了示例,正常情况下也不会这么写
fun main() { Observable.create { emitter: ObservableEmitter<Int> -> println("onSourceSubscribe") emitter.onNext(1) emitter.onNext(2) emitter.onNext(3) emitter.onComplete() }.subscribe(object : Observer<Int> { override fun onComplete() { println("onComplete") } override fun onSubscribe(d: Disposable) { println("onObserverSubscribe") } override fun onNext(t: Int) { println("onNext $t") } override fun onError(e: Throwable) { println("onError") } }) } 复制代码
输出结果:
onObserverSubscribe onSourceSubscribe onNext 1 onNext 2 onNext 3 onComplete 复制代码
那么为什么会按这个顺序输出呢?从代码中也可以看出从始至终也只调用了create、subscribe两个方法,先来看看create的源码
// Observable.java public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; } 复制代码
可以看出当我们没有设置onObservableAssembly时其实就是直接创建了一个ObservableCreate实例返回,接着看看subscribe
public final void subscribe(Observer<? super T> observer) { subscribeActual(observer); } 复制代码
可以看到内部就是调用了subscribeActual方法,而这个方法是个抽象方法,ObservableCreate实现了该方法
protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); source.subscribe(parent); } 复制代码
内部主要就是先创建了一个CreateEmitter实例,然后调用observer的onSubscribe方法,最后再调用source的subscribe方法,这就解释了onObserverSubscribe和onSourceSubscribe的输出,而source的subscribe方法又调用了三次onNext方法和一次onComplete方法,先看看onNext
// CreateEmitter.java public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } } 复制代码
如果还没dispose那么直接就调用了observer的onNext,这也就解释了onNext 1、onNext 2、onNext 3三个输出接着看onComplete
// CreateEmitter.java public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } } 复制代码
如果还没dispose就直接调用observer的onComplete,直接解释了onComplete的输出,我们注意到Observer还有一个onError回调,该方法可以通过调用emitter.onError手动触发
// CreateEmitter.java public void onError(Throwable t) { if (!tryOnError(t)) { RxJavaPlugins.onError(t); } } public boolean tryOnError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (!isDisposed()) { try { observer.onError(t); } finally { dispose(); } return true; } return false; } 复制代码
可以看到当还没被dispose就会调用到observer的onError方法,至此这个基本demo的源码已经分析完毕。 总结下上述代码其实就分为如下几步
上述实例的整体流程图如下
下面来从源码的角度研究研究RxJava中的几个基本方法
首先从最基本的map方法开始
map方法将每个onNext事件都调用所传入的Function实例的apply方法来达到转化数据源的效果,如下图所示
实例代码如下所示
fun main() { Observable.create { emitter: ObservableEmitter<Int> -> println("onSourceSubscribe") emitter.onNext(1) emitter.onNext(2) emitter.onNext(3) emitter.onComplete() } .map { it + 1 } .subscribe(object : Observer<Int> { override fun onComplete() { println("onComplete") } override fun onSubscribe(d: Disposable) { println("onObserverSubscribe") } override fun onError(e: Throwable) { println("onError") } override fun onNext(t: Int) { println("onNext $t") } }) } 复制代码
输出结果:
onObserverSubscribe onSourceSubscribe onNext 2 onNext 3 onNext 4 onComplete 复制代码
很明显map方法会对所有的next的数据做一次变化这里是加1,接着看看map的源码实现
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); } 复制代码
内部创建了一个ObservableMap实例并将当前的Observable实例和Function实例传入,根据本文一开始的分析当调用Observable的subscribe方法其实会调用subscribeActual方法
// ObservableMap.java public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); } 复制代码
创建了MapObserver实例将Observer实例进行包装然后调用source.subscribe,这个source其实就是上一级Observable实例本例中对应ObservableCreate实例,接着根据上文的分析会调用该MapObserver实例的onNext三次然后调用一次onComplete
// MapObserver.java public void onNext(T t) { // 初始化的时候done为false if (done) { return; } // 初始化的时候就是NONE if (sourceMode != NONE) { downstream.onNext(null); return; } U v; try { v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } downstream.onNext(v); } 复制代码
我们可以看到内部调用了mapper.apply方法,接着将拿到的结果v当做参数调用downstream的onNext方法,注意这里的downStream就是外界创建的一个Observer对象。上述实例是整体流程图如下。注:绿色框表示对象,蓝色框表示方法调用,括号内为简称
综上我们可以知道map通过代理下游Observer实例完成数据转换,接着看看flatMap的源码实现
flatMap方法用于将上游的每一个onNext事件都转换成一个Observable实例,如下图所示
fun main() { Observable.create { emitter: ObservableEmitter<Int> -> println("onSourceSubscribe") emitter.onNext(1) emitter.onNext(2) emitter.onNext(3) emitter.onComplete() } .flatMap { Observable.just(it, it + 1) } .subscribe(object : Observer<Int> { override fun onComplete() { println("onComplete") } override fun onSubscribe(d: Disposable) { println("onObserverSubscribe") } override fun onError(e: Throwable) { println("onError") } override fun onNext(t: Int) { println("onNext $t") } }) } 复制代码
输出结果:
onObserverSubscribe onSourceSubscribe onNext 1 onNext 2 onNext 2 onNext 3 onNext 3 onNext 4 onComplete 复制代码
很显然flatMap将每一个事件比如1转换成一个拥有1、2两个事件的Observable实例,来看看其源码实现
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) { return flatMap(mapper, false); } public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) { return flatMap(mapper, delayErrors, Integer.MAX_VALUE); } public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) { return flatMap(mapper, delayErrors, maxConcurrency, bufferSize()); } public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) { return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize)); } 复制代码
默认delayErrors为false表示当一个事件出现异常就会停止整个事件序列,默认并发数为Int的最大值,默认缓存大小为128,然后根据这些参数和当前Observable实例构建出一个ObservableFlatMap实例,我们看看其subscribeActual方法
// ObservableFlatMap.java public void subscribeActual(Observer<? super U> t) { source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize)); } 复制代码
内部又通过这些参数和下游的Observer实例构建了一个MergeObserver实例,直接看看其onSubscribe方法
// MergeObserver.java public void onSubscribe(Disposable d) { // 只会回调一次下游的onSubscribe方法 if (DisposableHelper.validate(this.upstream, d)) { this.upstream = d; downstream.onSubscribe(this); } } 复制代码
如果已经有上游了就不做任何处理不然进行上游的赋值,然后回调了下游也就是自定义的那个Observer的onSubscribe方法,接着看看其onNext方法是怎么把一个输入源转化成一个Observable的
public void onNext(T t) { ObservableSource<? extends U> p; p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource"); subscribeInner(p); } 复制代码
先是调用了传入的apply方法将每个onNext数据源转化为Observable实例,接着调用subscribeInner方法
void subscribeInner(ObservableSource<? extends U> p) { for (;;) { if (p instanceof Callable) { ... } else { InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++); if (addInner(inner)) { p.subscribe(inner); } break; } } } boolean addInner(InnerObserver<T, U> inner) { for (;;) { InnerObserver<?, ?>[] a = observers.get(); int n = a.length; InnerObserver<?, ?>[] b = new InnerObserver[n + 1]; System.arraycopy(a, 0, b, 0, n); b[n] = inner; if (observers.compareAndSet(a, b)) { return true; } } } 复制代码
为每个Observable对象都创建了一个InnerObserver实例,然后将其放入到一个数组中去,最后调用subscribe方法进行订阅,由于apply方法返回了一个ObservableFromArray实例,所以看看其subscribeActual方法
// ObservableFromArray.java public void subscribeActual(Observer<? super T> observer) { FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array); observer.onSubscribe(d); if (d.fusionMode) { return; } d.run(); } 复制代码
observer指代InnerObserver,看看其onSubscribe方法
public void onSubscribe(Disposable d) { // 第一次调用会返回true,d就是FromArrayDisposable实例其派生自QueueDisposable if (DisposableHelper.setOnce(this, d)) { if (d instanceof QueueDisposable) { QueueDisposable<U> qd = (QueueDisposable<U>) d; // 相当于传入了7 int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY); if (m == QueueDisposable.SYNC) { fusionMode = m; queue = qd; done = true; parent.drain(); return; } if (m == QueueDisposable.ASYNC) { fusionMode = m; queue = qd; } } } } //FromArrayDisposable.java public int requestFusion(int mode) { // 很明显7 & 1 != 0 if ((mode & SYNC) != 0) { fusionMode = true; return SYNC; } return NONE; } 复制代码
这里暂时还没法理解这个fusionMode(混合模式)是干什么用的,接着会调用到MergeObserver的drain方法
void drain() { // 只会执行一次,循环将所有事件取出 if (getAndIncrement() == 0) { drainLoop(); } } // 当取消了或者出现了错误并其dealyErrors为false时会将所有InnerObserver都dispose掉 boolean checkTerminate() { if (cancelled) { return true; } Throwable e = errors.get(); if (!delayErrors && (e != null)) { disposeAll(); e = errors.terminate(); if (e != ExceptionHelper.TERMINATED) { downstream.onError(e); } return true; } return false; } void drainLoop() { // 这里的downstream就是外界自定义的Observer实例 final Observer<? super U> child = this.downstream; for (;;) { ... for (int i = 0; i < n; i++) { if (checkTerminate()) { return; } InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j]; SimpleQueue<U> q = is.queue; if (q != null) { for (;;) { U o; try { o = q.poll(); } catch (Throwable ex) { ... if (checkTerminate()) { return; } continue sourceLoop; } // 每取出一个便会调用下游的onNext方法 child.onNext(o); } // 会把一个Observable源所有的数据都取完了以后才会进入下一个 if (o == null) { break; } } ... } .. } } 复制代码
drainLoop内部会从数组中一个个取出InnerObserver实例,并取出所对应的数据源然后每取出一个回调下游Observer的onNext方法,下面用一张图来总结下实例的流程