RxJava 目前已经是 3.x 版本了,但是我司一直在用 1.x 的版本;所以我准备去从 RxJava 1.x 开始,去看它的源码结构,从源码中去理解 RxJava 中的一些概念和实现原理。其实我对 RxJava 也只是会用,没有系统的去了解;我希望我自己能够从源码里面理解他的设计思想;
Rxjava 主要使用观察者模式的思想,所以它主要有 观察者
和 被观察者
2个角色,虽然用起来很简便,但是其实里面重要的角色还有很多。首先我列一下我认为比较重要的类或者动作;
rx.Observable: 可订阅的内容(被观察者)
rx.Observer: 观察者,虽然感觉上观察者和被观察者是一对,但是实际使用的时候 Observable 和 rx.Subscriber 才是配对使用的;
rx.Observable.OnSubscribe: 这个是一个内部类,在 Observable 中很重要的一个类,每个 Observable 的对象必须要有 onSubscribe 这个对象;
rx.Subscriber: 集 Observer 和 Subscription 一身的类 ,这里有个很重要的方法,setProducer(), 设置一个发射器,用来把数据发送出去。
rx.Subscription: 维护订阅关系, 这个主要是在释放资源的时候使用一下;
subscribe(): 这是一个订阅动作,数据流动开始的一个触发地方;
从上面这个列表来看,会发现里面的单词和类长的有点像,一开始我一直在这些单词里面来回切换,而且里面还有很多 Action, Function 相关的东西,搞的自己有点头晕。那么我们从一个简单例子进行分析入手:
Observable observable = Observable.just(1, 2, 3); // 生成一个被观察者 Observer observer = new Observer<Integer>() { // 生成一个观察者 @Override public void onCompleted() { Log.d("Observable", "onCompleted"); } @Override public void onError(Throwable e) { Log.d("Observable", "onError: " + e); } @Override public void onNext(Integer integer) { Log.d("Observable", "onNext: " + integer); } }; Subscription subscription = observable.subscribe(observer); // 生成订阅关系 复制代码
上面这段代码包括三种类: Observable(被观察者), Observer(观察者), Subscription(订阅关系);以上 就是最简单的使用。这 3 个类,应该大部分人都能够理解。那么我们再去看看这些类的内部结构,首先我们从 Obserable 入手;
这个是一个创建 Observable 的过程,跟踪代码会发现, 需要实例化一个 OnSubscriber 实例:
public static <T> Observable<T> from(T[] array) { int n = array.length; if (n == 0) { return empty(); } else if (n == 1) { return just(array[0]); } return create(new OnSubscribeFromArray<T>(array)); // 这个地方创建了实例 } 复制代码
所以说每个 Observable 必须有个 OnSubscriber 和它相伴;我们继续跟踪一下源码发现;
public final class OnSubscribeFromArray<T> implements OnSubscribe<T> { final T[] array; public OnSubscribeFromArray(T[] array) { this.array = array; } @Override public void call(Subscriber<? super T> child) { child.setProducer(new FromArrayProducer<T>(child, array)); // 看这里 } 复制代码
这个 OnSubscribeFromArray 会新建一个 Producer, 并且把它和 child(也就是Subscriber), 做关联(仔细看这里的实现,就会发现他们其实是双向关联);来我们再看下 Subscriber.setProducer() 的方法;
public void setProducer(Producer p) { long toRequest; boolean passToSubscriber = false; synchronized (this) { toRequest = requested; producer = p; if (subscriber != null) { // middle operator ... we pass through unless a request has been made if (toRequest == NOT_SET) { // we pass through to the next producer as nothing has been requested passToSubscriber = true; } } } // do after releasing lock if (passToSubscriber) { subscriber.setProducer(producer); } else { // we execute the request with whatever has been requested (or Long.MAX_VALUE) if (toRequest == NOT_SET) { producer.request(Long.MAX_VALUE); // 看这里,我在这里呢 } else { producer.request(toRequest); } } } 复制代码
仔细看,发现里面有 producer.request() 方法,这个方法就是触发onNext的地方;我画了一张关系图;可以一起来看下;
just-observerable-图.png
上面这张图,说明在使用 Observable.just(1,2,3) 的时候创建了几个主要关键的4种角色。右边没有颜色的是主要基类;在这里除了 Producer, 另外 3 个应该比较熟悉,其实如果在使用其他方式创建 Observable 的时候,这个 Producer 不是必选的。如下:
Observable observable1 = Observable.create(new Observable.OnSubscribe<Object>() { @Override public void call(Subscriber<? super Object> subscriber) { subscriber.onNext(111); } }); 复制代码
那么这个Producer 到底有什么用呢??其实主从注释来看,主要用来控制 Obserable 和 Subscriber 之间数据流动量, 关于更详细的理解可以去看下Producer 这篇文章,讲的比较好。
上一节说明生成被观察者过程中出现的主要 4 中角色: Observable, OnSubscriber, Producer, Subscriber; 接下来说说触发的过程,Observable 生成好了,那么就是触发订阅关系了代码如下:
Subscription subscription = observable.subscribe(observer); // 生成订阅关系 复制代码
如果继续深入下去,你就会发现主要触发执行的是
static <T> Subscription subscribe(Subscriber<? super T> subscriber /*标记一*/, Observable<T> observable/*标记二*/) { //去掉没用代码 // new Subscriber so onStart it subscriber.onStart(); try { // allow the hook to intercept and/or decorate, 标记三 hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { // ... 异常代码去掉 return Subscriptions.unsubscribed(); } } 复制代码
标记一
: 是上一个 Observer 包装一下,变过来的。也就是一开始我们自己写的 Observer; 你会发现,我们传递过来的 Observer 最后都会包装成 Subscriber, 也就是说, Observable 和 Subscriber 才是一对
。
标记二
: 这个是 Observable 本身,上面传下来的 this。
标记三
:这个地方的注释也可以看出,这个 hook 就是用来拦截或者装饰一下 onSubscribe 和 observable; 但是这个地方是没有实现的,所以暂时不用去管;hook 的类型是 rx.plugins.RxJavaObservableExecutionHook
这个从名字上来看,我一直以为这个类对后面的流程会影响很多,其实这个类是一个空实现,这个类的目的主要是用来观察 observable 在执行过程中的生命周期; 比如可以写些监听日志,或者在关键生命周期对应的地方做些手脚,但是目前是没有实现,如果要用需要自己实现一个,并且配置在system.properties 里面。暂时意义不大;
从跟踪 subscribe()
一系列方法,你会发现 Observer, Subscriber, Observable, Producer 这些对象使用装饰模式比较多,换个皮套一下,还是自己这种类型;
触发完订阅后,生成一个产物就是 Subscription 这个,这个主要作用是随时可以取消掉订阅;
本文主要关注对象是 Observable, OnSubscribe, Observer, Subscriber, Producer,Subscription; 画了一个类图; 供以后参考;
rxjava-主要类图.png
本来是想用一篇文章总结和记录我对 RxJava1.x 源码的阅读,发现有很多东西要写,一篇根本写不下。也不太容易写,我的想法是每篇文章需要记录的知识点不要太多,不然以后我自己都不愿意看,所以我打算把文章拆开一下,再写几篇。在看源码会发现过程中,RxJava 用了很多装饰模式,一个对象一层嵌了一层,还有很多 Action, Function 等东西,不够耐心,不够仔细是很容易被绕晕的,这充分说明我看源码的能力还是很弱的。分析这个源码对我最大的收获是,从源码里面我看到了装饰模式,而且还看到 AtomicXX相关的东西使用,CAS ( compareAndSwap)业界叫自旋锁, 这个在 Android 写应用的时候是不会去碰触的。看到优秀框架的时候,不仅仅要学习其理念,也要学习其编码方式;祝自己进步;