2020年开始了,给自己定了一个周更博客的计划。还好计划定的不算晚,我可以在第一个星期过去之前赶出第一篇,《rxjava2源码解析(一)基本流程分析》。这也是Read The Fucking Source Code系列的第一篇文章。
先给大家说说我写博客的初衷。我写博客的目的只有一个:就是成为 优质博主 。
去年的我,把焦虑当做学习的驱动力,结果很惨。头发没了,对学习的兴趣也越来越低。所以今年的我决定换种方式,去TM的焦虑。我不爱学习,我也不爱焦虑,我只想成为优质博主。所以就也有了今年周更博客的计划。我要把写博客当成打怪升级,把你们的每一次阅读当作我补一个兵,每一个点赞评论当作我的一次单杀。
为了有更好的游戏体验,我必然会把每篇技术博客写到极致,做到老少皆宜,大家都爱看。 希望大家走过路过,点个赞再走啊!在此拜谢
Read The Fucking Source Code,是程序员圈子里的一个众所周知的梗。大家都知道读源码枯燥无趣,可又不得不做,很是痛苦。我做这个系列的目的就是想让大家在阅读源码时,也能体验到愉悦。开篇第一章,决定用rxjava2源码阅读开头。因为这个框架日常都在用,面试也经常会问,已经成为Android必备技能。但是知道怎么用并不够,面试官一问原理就蒙圈可不行。所以就有了RTFSC的第一卷,rxjava2源码阅读。我会尽量把读源码这个枯燥的事情,给大家说的有趣一点,通俗一点。
首先随便写一个rxjava2的基本用法,我们根据这个简单的示例来看看rxjava2整个流程是什么样的。
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("1"); emitter.onNext("2"); emitter.onNext("3"); emitter.onNext("4"); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG,"onSubscribe"); } @Override public void onNext(String s) { Log.d(TAG,"s = "+ s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); 复制代码
上面的部分,看起来太长,我们可以先将其简化。
Observable.create(ObservableOnSubscribe).subscribe(Observer); 复制代码
Observable
, ObservableOnSubscribe
, Observer
。也就是我们日常说的,被观察者,观察者。 Observable
我们称其为装饰器, ObservableOnSubscribe
我们也称其为发射源, Observer
我们称其为处理器。为什么这么称呼,我们可以边看源码边讲。 Observable
通过一个 create
方法和一个 subscribe
方法,将发射源和处理器连接起来。 接下来我们看看这个连接在源码中是如何实现的。
Observable
首先从 Observable
的 create
入手。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null");//判空作用 return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } 复制代码
create
方法,需要传入一个发射源 ObservableOnSubscribe<T>
对象,返回一个 Observable<T>
对象。 onAssembly
方法我们也暂时放在一边,只需要知道是返回传入参数就行了。那 create
方法就是返回一个 ObservableCreate
对象。
那我们来看看 ObservableCreate
这个类。
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } .... } 复制代码
ObservableCreate
这个类继承自 Observable
。 ObservableCreate
的构造方法中直接将参数中的发射源 ObservableOnSubscribe
作为source存在本地。
OK, create
方法看完了。很简单,一句话总结,创建了一个装饰器对象,将发射源存在本地备用。(有没有一种看王刚炒菜的感觉?)
为什么我们称 Observable
为装饰器?因为rxjava在这里用到了装饰器模式,而 Observable
是装饰器模式下的基类。装饰器模式这里看还不明显,看到后面就知道了。
上面create方法需要传入一个发射源 ObservableOnSubscribe
参数,我们来看看源码:
public interface ObservableOnSubscribe<T> { /** * Called for each Observer that subscribes. * @param emitter the safe emitter instance, never null * @throws Exception on error */ void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception; } 复制代码
ObservableOnSubscribe
是一个接口,我们在使用它时会重写 subscribe
方法。 subscribe
方法中定义接下来要进行的一系列事件,所以我们称 ObservableOnSubscribe
为事件发射源。 subscribe
方法有一个参数就是发射器 ObservableEmitter
(后面会详细说明)。
接下来说说下一步: subscribe
。
前面说到, Observable
的 create
方法返回的是 ObservableCreate
对象, ObservableCreate
的 subscribe
方法并没有进行重写,我们直接看 Observable
里的 subscribe
方法。
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } } 复制代码
让我们抛开那些不重要的代码,直入主题。将其中的关键代码简化之后可以变为:
public final void subscribe(Observer<? super T> observer) { observer = RxJavaPlugins.onSubscribe(this, observer); subscribeActual(observer); } 复制代码
RxJavaPlugins
这个同样先甩在一边放着不管,跟前面的 onAssembly
一样,我们只需要知道这是返回传入的 observer
就行了。
那么只有 subscribeActual(observer)
这一句关键代码了。 Observable
中 subscribeActual
是一个抽象方法,具体实现在子类中。
Observable
是装饰器模式的基类,实际上所有操作都是它的子类完成的。所以我们称其为装饰器。不只是 create
方法,其他一些操作符,例如 map
, flatMap
也是这样的。这个后面讲到操作符和线程切换的时候,你们应该会更有体会。
所以后面我们分析 Observable
的 subscribe
方法时,直接看子类中的 subscribeActual(observer)
就行。
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } 复制代码
CreateEmitter
对象 parent
,然后调用处理器 observer
的 onSubscribe
方法持有它。 source.subscribe(parent)
将其传入到 source
当中。这个 source
就是前面我们说到备用的发射源 ObservableOnSubscribe
,其中的 subscribe
方法正好需要一个发射器 CreateEmitter
。 那整条订阅线就很清晰了:
Observable
调用 create
方法,参数是一个发射源 ObservableOnSubscribe
(我们对其 subscribe
方法进行重写),生成一个 ObservableCreate
对象。 ObservableCreate
调用 subscribe
方法,参数是一个处理器 Observer
。 subscribe
方法中我们以 Observer
为参数生成了一个发射器 CreateEmitter
,并且将这个发射器作为参数,调用了发射源 ObservableOnSubscribe
的 subscribe
方法。
这个 CreateEmitter
是什么?我们来看看它的源码。
CreateEmitter
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { private static final long serialVersionUID = -3434801548987643227L; final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } } @Override public void onError(Throwable t) { if (!tryOnError(t)) { RxJavaPlugins.onError(t); } } @Override public boolean tryOnError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (!isDisposed()) { try { observer.onError(t); } finally { dispose(); } return true; } return false; } @Override public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } } @Override public void dispose() { DisposableHelper.dispose(this); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } ..... } 复制代码
划重点:
CreateEmitter
是 ObservableCreate
中的一个静态内部类,继承自 AtomicReference<Disposable>
, ObservableEmitter<T>, Disposable
,我们称其为发射器。 onNext
方法中可以看出,这个发射器是直接与外部处理器对接的。 Disposable
接口,这个接口只有dispose()和isDisposed()两个方法,作用是切断发射过程。 subscribeActual
方法中我们可以看到, Observer
有调用 onSubscribe
方法持有这个 CreateEmitter
发射器对象。所以我们可以在处理器中通过 dispose
()接口随时中断发射流程。 onError
和 onComplete
两个是互斥的。只会执行一个,因为一旦执行其中一个,会立即切断发射过程。 总结一下出现的几个类:
Observable
-> 装饰器模式的基类,我们称其为装饰器。有一个 create
方法,参数是一个 ObservableOnSubscribe
发射源,会返回一个 ObservableCreate
对象。 ObservableCreate
-> 装饰器实现类。有一个 subscribe
方法,参数是 Observer
处理器。在 subscribe
方法内部,我们以 Observer
为参数生成了一个 CreateEmitter
发射器,并且将这个发射器作为参数,调用了发射源的 subscribe
方法。 ObservableOnSubscribe
-> 发射源,本身只是一个接口,我们重写了 subscribe
方法,定义了接下来要处理的事件,所以称其为发射源。 onError
和 onComplete
两个是互斥的,只会执行一个。 Observer
-> 处理器。用于处理发射器发送的数据。 再总结一下整个运行流程如下:
Observable
调用 create
方法,参数是一个发射源 ObservableOnSubscribe
(我们对其 subscribe
方法进行重写),生成一个 ObservableCreate
对象。 ObservableCreate
调用 subscribe
方法,参数是一个处理器 Observer
。 subscribe
方法中我们以 Observer
为参数生成了一个 CreateEmitter
发射器,并且将这个发射器作为参数,调用了发射源 ObservableOnSubscribe
的 subscribe
方法。 ObservableOnSubscribe
的 subscribe
方法中定义了我们要处理的事件,并将结果传递给发射器 CreateEmitter
, CreateEmitter
先判断事件流是否断开,不断开则将结果传递给处理器 Observer
。 Observer
处理结果。
这时候我们再回头看我们前面扔掉的东西, RxJavaPlugins.onAssembly
和 RxJavaPlugins.onSubscribe
。我们直接看源码。
/** * Calls the associated hook function. * @param <T> the value type * @param source the hook's input value * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) @NonNull public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; } 复制代码
方法介绍中有描述:Calls the associated hook function。
了解hook的应该就知道了,这里相当于是利用Java反射机制,对source进行了一层包装拦截。rxjava给我们提供了一个注入hook的方法,我们可以通过hook来实现在调用source之前,需要先调用我们设置的拦截函数。我们现在只需要知道有这个东西就行了,后面有这个需要再用。