RxJava是什么?根据RxJava在GitHub上给出的描述:
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java
大致意思是:
RxJava—一个可以在JVM上运行的,基于观察者模式 实现异步操作的java库。
RxJava的作用:
就是 异步
RxJava的使用,可以使“逻辑复杂的代码”保持极强的阅读性。
Rxjava github地址
RxAndorid的作用:
Android中RxAndorid与RxJava配合使用; RxAndorid 封装了 AndroidSchedulers.mainThread()
,Android开发者使用过程中,可以轻松的将任务post Andorid主线程
中,执行页面更新操作。
RxAndroid github地址
// Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { //1、“异步线程” 执行耗时操作 //2、“执行完毕” 调用onNext触发回调,通知观察者 e.onNext("1"); e.onComplete(); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { // 订阅线程 订阅的那一刻在订阅线程中执行 } @Override public void onNext(String value) { // “主线程”执行的方法 } @Override public void onError(Throwable e) { // "主线程"执行的方法 } @Override public void onComplete() { // "主线程"执行的方法 } }); 复制代码
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { // IO 线程 // 请求网络数据 e.onNext("123456"); } }).map(new Function<String, Integer>() { @Override public Integer apply(String s) { // IO 线程 // 网络数据解析(数据转化) // // throw new RequestFailException("获取网络请求失败"); return 123; } }).doOnNext(new Consumer<Integer>() { //保存登录结果UserInfo @Override public void accept(@NonNull Integer bean) throws Exception { // IO 线程 // 保存网络数据 } }).subscribeOn(Schedulers.io()) //IO线程 .observeOn(AndroidSchedulers.mainThread()) //主线程 .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer bean) throws Exception { // 更新UI } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { // 错误 显示错误页面 } }); 复制代码
Flowable是为了应对 Backpressure
产生的。
Flowable是一个 被观察者
,与 Subscriber(观察者)
配合使用
// Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { //1、“异步线程” 执行耗时操作 //2、“执行完毕” 调用onNext触发回调,通知观察者 emitter.onNext(0); emitter.onComplete(); } // 若消费者消费能力不足,则抛出MissingBackpressureException异常 }, BackpressureStrategy.ERROR) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { // 订阅时执行,发生在“订阅线程” // 这个方法是用来向生产者申请可以消费的事件数量 // 这里表明消费者拥有Long.MAX_VALUE的消费能力 s.request(Long.MAX_VALUE); } @Override public void onNext(Integer integer) { // “主线程”执行的方法 } @Override public void onError(Throwable t) { // "主线程"执行的方法 } @Override public void onComplete() { // "主线程"执行的方法 } }); 复制代码
Backpressure(背压)
即 生产者的生产速度
大于 消费者的消费能力
引起的问题。
在RxJava中有一种情况就是 被观察者发送消息十分迅速
以至于 观察者不能及时的响应这些消息
。
例如:
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { // “异步线程”中 生产者有无限的生产能力 while (true){ e.onNext(1); } } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { // “主线程”中 消费者消费能力不足,从而造成事件无限堆积,最后导致OOM Thread.sleep(2000); System.out.println(integer); } }); 复制代码
异步线程中
生产者有无限的生产能力;
主线程
中 消费者消费能力不足,从而造成事件无限堆积,最后导致OOM。
上述的现象,有个专有的名词来来形容,即: Backpressure(背压)
Subscription.request(long n)
方法是用来向 生产者申请可以消费的事件数量
。
request(long n)
方法后,生产者便发送对应数量的事件供消费者消费; 不显示调用request
就表示 消费能力为0
。 在异步调用时,RxJava中有个缓存池,用来缓存消费者处理不了暂时缓存下来的数据,缓存池的默认大小为128,即只能缓存128个事件。
无论request()中传入的数字比128大或小,缓存池中在刚开始都会存入128个事件;当然如果本身并没有这么多事件需要发送,则不会存128个事件。
BackpressureStrategy.ERROR
策略下,如果生产者生产的事件大于128个,缓存池便会溢出,从而抛出 MissingBackpressureException
异常; BackpressureStrategy.BUFFER
策略:将RxJava中默认的128个事件的缓存池换成一个更大的缓存池,这样,消费者通过request()即使传入一个很大的数字,生产者也会生产事件。但是这种方式比较消耗内存,除非是我们比较了解消费者的消费能力,能够把握具体情况,不会产生OOM。总之BUFFER要慎用。 BackpressureStrategy.DROP
策略:当消费者处理不了事件,则丢弃。消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。 BackpressureStrategy.LATEST
策略: LATEST与DROP功能基本一致。消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。唯一的区别就是LATEST总能使消费者能够接收到生产者产生的最后一个事件。 从这段不涉及操作符和线程切换的简单例子开始:
// 创建观察者 Observer observer = new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(String o) { } @Override public void onError(@NonNull Throwable e) { Log.d(TAG, "onError data is :" + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }; // 创建被观察者 Observable observable = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { e.onNext("hello"); e.onNext("world"); e.onComplete(); } }); // 订阅 observable.subscribe(observer); 复制代码
先看一下 ObservableOnSubscribe.java
这个类
public interface ObservableOnSubscribe<T> { void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception; } 复制代码
由代码可知 ObservableOnSubscribe
是一个回调接口,回调方法中参数为 ObservableEmitter
,下边看一下 ObservableEmitter
这个类。
ObservableEmitter字面意思是 被观察者发射器 ,看一下源码:
public interface ObservableEmitter<T> extends Emitter<T> { void setDisposable(@Nullable Disposable d); void setCancellable(@Nullable Cancellable c); boolean isDisposed(); @NonNull ObservableEmitter<T> serialize(); @Experimental boolean tryOnError(@NonNull Throwable t); } 复制代码
ObservableEmitter
是对 Emitter
的扩展,而扩展的方法正是 RxJava2.0 之后引入的。 提供了可中途取消等新能力 ,我们看 Emitter
源码:
public interface Emitter<T> { void onNext(@NonNull T value); void onError(@NonNull Throwable error); void onComplete(); } 复制代码
Emitter
字面意思是 发射器 ,这里边的三个方法,大家都很熟悉了。其对应了以下这段代码:
new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { e.onNext("hello"); e.onNext("world"); e.onComplete(); } } 复制代码
回调说完,下边我们来看 Observable.create(ObservableOnSubscribe<T> source)
这段代码。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } 复制代码
ObservableOnSubscribe
被用来创建 ObservableCreate
,其实 ObservableCreate
就是 Observable
的一个实现类 因此 Observable.create(ObservableOnSubscribe<T> source)
这段代码,实际是:
// ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { // IO线程中执行 e.onNext("hello"); e.onNext("world"); e.onComplete(); } }); 复制代码
ObservableOnSubscribe.subscribe
方法被执行时,用户通过调用 ObservableEmitter.onNext
方法,将数据发送出去(发送给观察者) 下边我们看一下 ObservableCreate
这个类
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } // 省略部分代码 ... } 复制代码
ObservableOnSubscribe.subscribe
方法是在 ObservableCreate.subscribeActual
方法中第四行中被执行了; subscribe
方法中,用户通过调用 ObservableEmitter.onNext
方法,将数据发送出去; subscribeActual
方法第二行,调用了 observer.onSubscribe(parent);
方法。 订阅发生时,在订阅线程主动执行了 observer
的 onSubscribe
方法; CreateEmitter
是对 ObservableCreate.subscribeActual(Observer observer)
方法传入的 Observer
的封装; CreateEmitter
的作用是任务取消时,可以不再回调其封装的观察者; observer
的 onNext
方法,由 CreateEmitter.onNext
方法调用; Observable.create(ObservableOnSubscribe<T> source);
方法最终返回一个 ObservableCreate
对象。
下边看 observable.subscribe(observer);
方法
observable.subscribe(observer);
即 订阅发生的那一刻。 observable.subscribe(observer);
实际是 ObservableCreate.subscribe(observer);
下边查看 Observable
的 subscribe(observer)
方法
public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); // Observable的subscribe方法,实际执行的是subscribeActual方法 subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(e); // NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } } 复制代码
observable.subscribe(observer);
方法时,实际是调用了 observable.subscribeActual(observer)
方法。 observable
为 ObservableCreate
的引用,因此这里调用的是 ObservableCreate.subscribeActual(observer)
方法。 我们又回到 ObservableCreate
这个类的 subscribeActual
方法
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } // subscribeActual 方法在 订阅发生的那一刻被调用 既 observable.subscribe(observer);时被调用 @Override protected void subscribeActual(Observer<? super T> observer) { // 若中途任务取消,通过CreateEmitter 可终止对observer中方法onNext 、onError 等的回调 CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 订阅发生时,执行 观察者的onSubscribe(Disposable d) 方法 observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } // 省略部分代码 ... } 复制代码
subscribeActual
方法在 订阅发生的那一刻被调用 的;在 observable.subscribe(observer);
时被调用; observer.onSubscribe(parent);
订阅发生时,在订阅线程回调 observer
的 onSubscribe
方法 subscribeActual
方法中,传入的 Observer
会被包装成一个 CreateEmitter
;若中途任务取消,通过 CreateEmitter
可终止对 observer
中方法 onNext 、onError
等的回调; subscribeActual 中第二行代码 observer.onSubscribe(parent);
observer.onSubscribe(parent);
订阅发生时,执行 观察者的 onSubscribe(Disposable d)
方法,这里回到了以下代码
// 创建观察者 Observer observer = new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { Log.d(TAG, "onSubscribe"); } // ... 省略onNext、onError、onComplete }; 复制代码
new CreateEmitter(observer)
,其实现了 Disposable
接口,若任务取消,则不回调传入的观察者 observer
对应的 onNext 、onError、onComplete
等方法 subscribeActual 中第四行代码 source.subscribe(parent);
source.subscribe(parent);
是 ObservableOnSubscribe.subscribe(new CreateEmitter<T>(observer));
代码最终回到 ObservableOnSubscribe
的 subscribe
:
new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { e.onNext("hello"); e.onNext("world"); e.onComplete(); } } 复制代码
subscribe
中,调用到 CreateEmitter
类的 onNext 、onComplete、onError
方法,将数据发送 CreateEmitter
中的 观察者
到此, “这段不涉及操作符和线程切换的简单例子” 的代码跟踪结束。
从这段线程切换的简单例子开始:
// 创建观察者 Observer observer = new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { // 订阅线程 订阅的那一刻在订阅线程中执行 } @Override public void onNext(String o) { // Android 主线程中执行 } @Override public void onError(@NonNull Throwable e) { // Android 主线程中执行 } @Override public void onComplete() { // Android 主线程中执行 } }; // 创建被观察者 Observable observable = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { // IO线程中执行 e.onNext("hello"); e.onNext("world"); e.onComplete(); } }); // 被观察者 IO 线程 observable = observable.subscribeOn(Schedulers.io()); // 观察者 Android主线程 observable = observable.observeOn(AndroidSchedulers.mainThread()); // 订阅 observable.subscribe(observer); 复制代码
先来个我总结的RxJava2的整个代码执行流程:
在 源码阅读——简单例子 (一) 中我们了解到了 Observable.create(ObservableOnSubscribe<T> source)
实际是 如下代码:
// ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { // IO线程中执行 e.onNext("hello"); e.onNext("world"); e.onComplete(); } }); 复制代码
ObservableCreate
中含有一个 subscribeActual(observer)
方法,用于执行传入观察者的 observer.onSubscribe
方法,和间接调用 观察者的 onNext、onComplete
等方法; public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } // 省略部分代码 ... } 复制代码
subscribeActual
方法第二行,调用了传入的观察者的 observer.onSubscribe(parent);
方法; 订阅发生时,在订阅线程主动执行了 observer
的 onSubscribe
方法; subscribeActual
方法第四行,调用了传入的观察者的 observer.subscribe
方法; subscribe
方法中,用户通过调用 CreateEmitter.onNext
方法,将数据发送出去; CreateEmitter
是对 ObservableCreate.subscribeActual(Observer observer)
方法传入的 Observer
的封装; CreateEmitter
的作用是任务取消时,可以不再回调其封装的观察者; observer
的 onNext
方法,由 CreateEmitter.onNext
方法调用; 下边查看 observable.subscribeOn(Schedulers.io()) 相关代码
注:
ObservableEmitter
是 CreateEmitter
的引用,是对 Observer
的进一步封装。 CreateEmitter
在执行 onNext
时,如果任务取消,则不再回调 Observer
的 onNext
方法。
下边我们查看 Observable
类的 subscribeOn(Scheduler scheduler)
方法
public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); // 生成一个ObservableSubscribeOn对象 return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); } 复制代码
RxJavaPlugins ObservableSubscribeOn
这里 Observable observable = observableCreate.subscribeOn(Schedulers.io())
代码实际是
ObservableSubscribeOn observable = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io()) 复制代码
observable.subscribeOn(Schedulers.io())
返回的是一个 ObservableSubscribeOn
的引用 public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } // ... 省略部分代码 } 复制代码
看一下 ObservableSubscribeOn
中的 subscribeActual
方法
subscribeActual
方法第二行代码中,执行了传入 Observer
的 onSubscribe
方法; subscribeActual
方法第三行: 在 scheduler
对应的 IO线程
中,执行 observableCreate
的 subscribe
方法,传入参数为 SubscribeOnObserver
,即: IO线程中
执行 observableCreate.subscribe(new SubscribeOnObserver(observer));
因此,无论 ObservableSubscribeOn.subscribeActual(observer)
在哪个线程中被调用 observableCreate.subscribe(new SubscribeOnObserver<T>(observer))
均在 IO线程 中执行,因此观察者的 e.onNext("hello"); e.onComplete();
亦在 IO线程 中执行;
下边我们查看 Observable
类的 observeOn(Scheduler scheduler)
方法
public final Observable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); } // public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); } 复制代码
这里可以看到 Observable observable = observableSubscribeOn.observeOn(AndroidSchedulers.mainThread())
实际是:
ObservableObserveOn observable = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false, 128); 复制代码
因此 , observable.observeOn(AndroidSchedulers.mainThread())
返回的是 ObservableObserveOn
的引用。
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; final boolean delayError; final int bufferSize; public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; } @Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } } // ... 省略部分代码 } 复制代码
看一下 ObservableObserveOn
中的 subscribeActual
方法
subscribeActual
方法第五行代码,实际为 observableSubscribeOn.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));
ObserveOnObserver
的作用是在 ObserveOnObserver
的 onNext
方法被实行时;将 observer
的 onNext
方法post到 Android主线程
中; Observable
的 subscribe(Observer observer)
方法,实际调用到了 Observable
的 subscribeActual(Observer observer)
方法; observable
实际是 ObservableObserveOn
的引用; 因此, observable.subscribe(observer)
实际执行的是 observableObserveOn.subscribeActual(observer)
到这里,我们 线程切换 (二) 的小例子变换为了以下代码:
// 创建观察者 Observer observer = new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { // 订阅线程 订阅的那一刻在订阅线程中执行 } @Override public void onNext(String o) { // Android 主线程中执行 } @Override public void onError(@NonNull Throwable e) { // Android 主线程中执行 } @Override public void onComplete() { // Android 主线程中执行 } }; // ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { // IO线程中执行 e.onNext("hello"); e.onNext("world"); e.onComplete(); } }); // ObservableSubscribeOn observableSubscribeOn = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io()) // ObservableObserveOn observableObserveOn = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false, 128); // observableObserveOn.subscribeActual(observer); 复制代码
下边我们查看 observableObserveOn.subscribeActual(observer)
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; final boolean delayError; final int bufferSize; public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { // source 为 observableSubscribeOn super(source); // scheduler 为AndroidSchedulers.mainThread() this.scheduler = scheduler; // false this.delayError = delayError; // 128 this.bufferSize = bufferSize; } @Override protected void subscribeActual(Observer<? super T> observer) { // AndroidSchedulers.mainThread() 为 HandlerScheduler,因此会走到else部分代码 if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } // 代码会走到else 部分 else { Scheduler.Worker w = scheduler.createWorker(); // source 为 observableSubscribeOn source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } } // ... 省略部分代码 } 复制代码
subscribeActual
方法中, AndroidSchedulers.mainThread()
为 HandlerScheduler
,因此 if 中的判断语句直接忽略,直接走到代码的 else 部分。 subscribeActual
方法中,将观察者 observer
封装成了 ObserveOnObserver
;并且调用 observableSubscribeOn.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize))
observableSubscribeOn.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize))
实际是 ObserveOnObserver observeOnObserver = new ObserveOnObserver<T>(observer, w, delayError, bufferSize) // 1、“订阅线程中” —— 执行onSubscribe, 实际执行的是observer的onSubscribe方法 observeOnObserver.onSubscribe(new SubscribeOnObserver<T>(observeOnObserver)); // 2、“IO程中” —— 执行subscribe ;IO线程 subscribe方法中,用户主动调用ObserveOnObserver的onNext、onError、onComplete方法,将数据发出去 observableCreate.subscribe(new SubscribeOnObserver<T>(observeOnObserver)) 复制代码
SubscribeOnObserver
的 onNext
是将数据发送出去 SubscribeOnObserver.onNext
调用了 observeOnObserver.onNext
observeOnObserver.onNext
通过 HandlerScheduler
将 observer.onNext、observer.onError、observer.onComplete
等方法post到Android主线程中执行。 最后总结一下RxJava2的整个执行流程:
手把手教你使用 RxJava 2.0(一)
RxJava2 源码解析(一)
RxJava2 源码解析——流程