在 Android 开发中,使用最多就是把耗时任务放到其他线程去执行,其他线程执行完了后,就切换到 UI 线程(也叫主线程)进行接收数据;常用代码如下:
class IntegerObserver : Observer<Int> { override fun onError(e: Throwable?) { Log.d("Observable", "onError: $e") } override fun onCompleted() { Log.d("Observable", "onCompleted:" + Thread.currentThread()) Log.d(tag, "onCompleted") } override fun onNext(t: Int) { Log.d("Observable", "onNext: $t") } } 复制代码
var observable: Observable<Int> = Observable.create { Log.d("Observable", "call:" + Thread.currentThread()) it.onNext(1) it.onNext(2) it.onNext(3) it.onCompleted() } // 1.生成一个 observer observable = observable.subscribeOn(Schedulers.io())// 2.工作线程池设置, observable = observable.observeOn(AndroidSchedulers.mainThread()) //3.把查看数据,放到主线程中看 var subscription: Subscription = observable.subscribe(IntegerObserver())// 4.触发任务(订阅开始) subscription.isUnsubscribed 复制代码
打印结果:
D/Observable: call:Thread[RxIoScheduler-2,5,main] D/Observable: onNext: 1 D/Observable: onNext: 2 D/Observable: onNext: 3 D/Observable: onCompleted:Thread[main,5,main] D/Observable: onCompleted 复制代码
以上这种模式大家都非常熟悉,那么我想从源码里面去看看,为什么,他是怎么做到线程之间的切换的呢?代码中 1
和 4
上一篇文章已经说明过了; 为了便于理解, 我把上一篇文章中的主要几个角色的图再贴一下; 命名: 图一
2
和
3
;
做这个代码分析的时候,为了便于理解,可以先把 3
这个代码注释掉;
observable = observable.subscribeOn(Schedulers.io()) // 2.工作线程池设置 //observable = observable.observeOn(AndroidSchedulers.mainThread()) //3.把查看数据,放到主线程中看 复制代码
从 图一
中我们可以看出来,创建一个 Observerable
需要一个 OnSubscribe
;以上这个地方创建 Observable 主要是需要 rx.internal.operators.OperatorSubscribeOn
这么一个 OnSubscribe对象,然后我们可以看下这个OperatorSubscribeOn 这个对象的源码:
之前设置的 scheduler
就是在这里的,然后在看 call
方法中会创建 Woker inner
, 这个 inner
执行 schedule, 就会在异步线程执行到 source.unsafeSubscribe()
;
1
这个地方的代码;
1
代码执行完后,又会回调执行到
OperatorSubscribeOn
中的第50行这里的代码,这里代码就是会执行onNext(), 这样就完整的走完一个异步任务的过程;这个线程切换最关键的代码在
OperatorSubscribeOn
中;是由这个
OperatorSubscribeOn
call() 方法去触发异步线程工作的;
从 1.1 上分析得到一个结论: subscribeOn
这个代码过程中主要是对 OnSubscibe 进行封装;然后在 subscribe() 触发的时候直接执行 OnSubscibe 的 call 方法;call 方法里面参数是 Subscriber, 可以直接执行 Subscriber 的 3 个生命周期方法;这就完成了回调过程;
subcriber() —> OnSubscribe.call(subscriber) —> schedule() —> 内部类的 Subscriber —> 触发内部类中 onNext() —> 触发被内部类包裹的 subscriber.onNext() 复制代码
observable = observable.observeOn(AndroidSchedulers.mainThread()) //3.把查看数据,放到主线程中看 复制代码
同理以上的 observable 的创建过程需要创建一个 OnSubscribe
, 那我们就先看看 observable.observeOn()
创建的 OnSubscribe
创建的对象是由哪个实现类完成, 我们一起来跟踪一下源码。
到这里个地方需要注意一下这里有个 lift
方法, lift()
方法接收的参数是 Operator
这个一个重要的角色类; 这里先留放着,等会回来说;我们再看看 lift()
方法;
从上面的代码里面,我可以看出来这个 Observable 的创建需要 rx.internal.operators.OnSubscribeLift
这个实现类,这个实现类是 implements OnSubscribe<R>
,所以我们去看下这个类的 call 方法;
这里的call 方法里面调用了红色箭头指定的地方,这里看到没有,有个 operator, 这里的 operator 就是之前在 observable.observeOn()
中实例化了一个 rx.internal.operators.OperatorObserveOn
这个类的对象;那我们继续看下这个对象的 call() 方法;
@Override public Subscriber<? super T> call(Subscriber<? super T> child) { if (scheduler instanceof ImmediateScheduler) { // avoid overhead, execute directly return child; } else if (scheduler instanceof TrampolineScheduler) { // avoid overhead, execute directly return child; } else { ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize); // parent.init(); // 初始化操作 return parent; } } 复制代码
这个 call 的作用就是把一个 Subscriber 包装一下,换成另外一个 Subscriber 是 rx.internal.operators.OperatorObserveOn.ObserveOnSubscriber
这是一个内部类,同样的他会被触发 call 方法, 再看下 call() 方法;
从上面那个 for 可以看出来,这个想要一个死循环;从 localChild.onNext(localOn.getValue(v))
这个对于这个 Subscriber 进行传递回调;那这里还有一个问题,就是这个call 怎么回调到主线程中呢?或者怎么进行线程之间的切换呢?在回退一步到 rx.internal.operators.OperatorObserveOn 的 call 方法
@Override public Subscriber<? super T> call(Subscriber<? super T> child) { if (scheduler instanceof ImmediateScheduler) { // avoid overhead, execute directly return child; } else if (scheduler instanceof TrampolineScheduler) { // avoid overhead, execute directly return child; } else { ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize); // parent.init(); // 初始化操作 return parent; } } 复制代码
这里有个 parent.init()
void init() { // don't want this code in the constructor because `this` can escape through the // setProducer call Subscriber<? super T> localChild = child; localChild.setProducer(new Producer() { @Override public void request(long n) { if (n > 0L) { BackpressureUtils.getAndAddRequest(requested, n); schedule(); // 这个地方启动的线程切换操作 } } }); localChild.add(recursiveScheduler); localChild.add(this); } 复制代码
从上面这个地方来看,schedule() 就是把接收消息的事情,切换到自己的线程池里面去操作了;这个线程池中主要是 Scheduler.Worker
在工作, Worker 主要是接收 Action0, Action0 的回调就是 call(), 也就是 rx.internal.operators.OperatorObserveOn.ObserveOnSubscriber
中的 call(), 这个 call() 里面有个循环,用来接收和处理所有的消息,代码就是上面说的那个 localChild.onNext(localOn.getValue(v))
这个调用地方是在循环体内,所以这个地方会被调用多次,就是把值传递给 subsciber.onNext(), 也就是我们的订阅者(Subscriber),我们自己的注册的回调;这个步骤主要是一直在封装 Subscriber;
从 2.1 代码分析上可以看出来, observable.observeOn() 里面的代码来看,主要也是对 OnSubscibe 进行装饰;装饰完后,然后去触发执行 Subscriber;这里有个特别需要注意的地方,在装饰 OnSubscibe 的时候,引入了一个 Operator 角色;这个角色是 OnSubscribeLift
的必要成员变量, OnSubscribeLift
的 call 方法会触发 Operator
的 call 方法;这个 call 方法里面会包装一个 新的 Subscriber
给 OnSubscribe 使用;归纳一下这个触发过程
subscribe() —> OnSubscribeLift.call() —> hook.onLift(operator).call(o) —> OperatorObserveOn.call() —> rx.internal.operators.OperatorObserveOn.ObserveOnSubscriber. call() —> 创建线程循环接收可能被创建了 —> ObserveOnSubscriber.onNext, onComplete, onNext —> work.schedule() -> 这个在循环体内 localChild.onNext(localOn.getValue(v)); 复制代码