在探索了RxJava的源码之后,在这里做一次总结。源码大致的执行流程。
1 创建Observable,然后使用操作符转换或者处理Observable,每一个操作符,都会产生一种对应类型的Observable。我们知道Observable是链式调用的,比如:
Flowable<String> observable = Flowable // 创建FlowableJust对象 .just("hello", "world") // 创建创建FlowableMap对象,并且在创建的时候将上一步创建的对象传入FlowableMap中 .map(String::toUpperCase) ; 复制代码
首先我创建了A对象,在A对象的基础之上进行下一步操作产生B对象,在创建B对象的时候,可以将自己作为参数传递给B对象,那么B对象的source就是A对象。
2 创建消费者,我们写的消费者本质上是RxJava框架封装到最后非常方便使用的,就是一个单纯的Observer类。
3 调用subscribe方法,将上面的步骤进行串联起来。比如说我们创建了A,链式调用又创建了B,然后B又创建了C,这个时候C有B的引用,B有A的引用。
但是在最后真正调用subscribe方法的时候,爆出的对象是C对象,即用C去调用subscribe方法。
这时:
源头Observer
,C创建自己的Observer,比如 CObserver
,参数为源头Observer,并且将其设置为downstrem。 CObserver
,然后B对象接受到 CObserver
创建自己的观察者,包装 CObserver
为自己的downstream 先整理下,这个时候分别创建了:AObserver,BObserver,CObserver,源头Observer(我们自己写的),并且
AObserver:
BObserver:
CObserver:
而对于Observable来说,则与上面相反。
CObservable:
BObservable:
AObservable:
4 当Observer传递到AObserver,即最开始产生数据哪里时,其调用自身Observer的onSubscribe方法,因为每个自身的Observer都有downstream的Observer,再逐次调用downstream的onSubscribe方法。
在调用onSubscribe方法时,将自身传递进去,这时BObserver的upstream为AObserver。依次类推
@Override public void onSubscribe(Subscription s) { if (SubscriptionHelper.validate(this.upstream, s)) { this.upstream = s; downstream.onSubscribe(this); s.request(Long.MAX_VALUE); } } 复制代码
5 我们知道最终调用的是首先创建的AObserveable,AObservable的subscribeActual方法中,在调用onSubscribe之后,将生产的数据与Observer联合起来,再运行。
AObserver运行完自己的观察者逻辑后,交由自己的downstream继续进行onNext方法,直到我们自己的消费者调用onNext方法。