Scheduler 与 Worker 在 RxJava2 中是一个非常重要的概念,他们是 RxJava 线程调度的核心与基石。Scheduler主要负责的就是一件事情,定义好每个流模块的执行线程。
源码分析我们先从subscribeOn方法开始。
@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); } 复制代码
上述代码可以看出,subscribeOn的核心代码是ObservableSubscribeOn,这个类只做了一件事情,它会把上一个流用装饰者模式包装了一下,当上一个流被执行的时候会将流执行到scheduler的线程上去。
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } } 复制代码
ObservableSubscribeOn的subscribeActual方法执行之后,scheduler.scheduleDirect(new SubscribeTask(parent))通过这段代码将当前流运行到Scheduler的线程内。
之后我们可以看下Scheduler的实现累,RxAndroid的HandlerScheduler,看看对于安卓的调度器,RxJava是怎么写的。
@Overridepublic Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { if (run == null) throw new NullPointerException("run == null"); if (unit == null) throw new NullPointerException("unit == null"); run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); handler.postDelayed(scheduled, unit.toMillis(delay)); return scheduled; } 复制代码
scheduler.scheduleDirect执行的时候就会调用scheduleDirect方法。看得出来,方法被触发之后调用了handler的postdelay方法,将当前的Runnable运行到该handler的线程上去。
static final class EventLoopWorker extends Scheduler.Worker { private final CompositeDisposable tasks; private final CachedWorkerPool pool; private final ThreadWorker threadWorker; final AtomicBoolean once = new AtomicBoolean(); EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; this.tasks = new CompositeDisposable(); this.threadWorker = pool.get(); } @Override public void dispose() { if (once.compareAndSet(false, true)) { tasks.dispose(); // releasing the pool should be the last action pool.release(threadWorker); } } @Override public boolean isDisposed() { return once.get(); } @NonNull @Override public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); } } 复制代码
接下来我们要说另外一个Worker,上面是IoScheduler里面的Worker代码。Work在初始化的时候会去生成一个线程池,这个线程池就是我们后续schedule执行的地方,当一个Runnnable被调度到这个work上的时,会调用schedule方法,然后将这个Runnnable运行到这个线程池上去。
协程上下文(coroutine context)包含一个协程调度器(参阅 CoroutineDispatcher),协程调度器 用于确定执行协程的目标载体,即运行于哪个线程,包含一个还是多个线程。协程调度器可以将协程的执行操作限制在特定线程上,也可以将其分派到线程池中,或者让它无限制地运行。
public suspend fun <T> withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T ): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont -> // compute new context val oldContext = uCont.context val newContext = oldContext + context // always check for cancellation of new context newContext.checkCompletion() // FAST PATH #1 -- new context is the same as the old one if (newContext === oldContext) { val coroutine = ScopeCoroutine(newContext, uCont) return@sc coroutine.startUndispatchedOrReturn(coroutine, block) } // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed) // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher) if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) { val coroutine = UndispatchedCoroutine(newContext, uCont) // There are changes in the context, so this thread needs to be updated withCoroutineContext(newContext, null) { return@sc coroutine.startUndispatchedOrReturn(coroutine, block) } } // SLOW PATH -- use new dispatcher val coroutine = DispatchedCoroutine(newContext, uCont) coroutine.initParentJob() block.startCoroutineCancellable(coroutine, coroutine) coroutine.getResult() } 复制代码
这是协程的线程调度的代码,当发现当前的调度器和目标调度器不是同一个的情况下,会new一个DispatchedCoroutine,开始进行线程的调度操作。
coroutine.initParentJob()初始化父任务,这个方法需要一开始就被初始化调用的。 然后就是核心关键将block.startCoroutineCancellable(coroutine, coroutine),该方法会创建一个新的可挂起线程。 进行异步等待操作,当有值的情况下会回调将当前挂起结束,进行下一步获取值操作,然后将当前的线程返回。
在调用withContext方法的时候因为我们传入的是Dispatchers.Main
@JvmStatic public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher 复制代码
而DispatcherMain是可以有外部的fatroy构造的,由安卓的kotlin支持库中可以发现,其实现类是AndroidDispatcherFactory。
internal class AndroidDispatcherFactory : MainDispatcherFactory { override fun createDispatcher(allFactories: List<MainDispatcherFactory>) = HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main") override fun hintOnError(): String? = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used" override val loadPriority: Int get() = Int.MAX_VALUE / 2 } 复制代码
这次真的转过来了把,没有干扰了把,各位老哥,我要给你们跪了啊。
接下来我们看下重头戏HandlerContext,这个类就是和rxjava的HandlerScheduler基本一模一样的线程调度器。
internal class HandlerContext private constructor( private val handler: Handler, private val name: String?, private val invokeImmediately: Boolean ) : HandlerDispatcher(), Delay { /** * Creates [CoroutineDispatcher] for the given Android [handler]. * * @param handler a handler. * @param name an optional name for debugging. */ public constructor( handler: Handler, name: String? = null ) : this(handler, name, false) @Volatile private var _immediate: HandlerContext? = if (invokeImmediately) this else null override val immediate: HandlerContext = _immediate ?: HandlerContext(handler, name, true).also { _immediate = it } override fun isDispatchNeeded(context: CoroutineContext): Boolean { return !invokeImmediately || Looper.myLooper() != handler.looper } override fun dispatch(context: CoroutineContext, block: Runnable) { handler.post(block) } override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { val block = Runnable { with(continuation) { resumeUndispatched(Unit) } } handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY)) continuation.invokeOnCancellation { handler.removeCallbacks(block) } } override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY)) return object : DisposableHandle { override fun dispose() { handler.removeCallbacks(block) } } } override fun toString(): String = if (name != null) { if (invokeImmediately) "$name [immediate]" else name } else { handler.toString() } override fun equals(other: Any?): Boolean = other is HandlerContext && other.handler === handler override fun hashCode(): Int = System.identityHashCode(handler) } 复制代码
DispatchedCoroutine在上面的这个挂起函数的父类CoroutineDispatcher,会调用dispatch方法,进行线程切换操作,然后是不是和上面的rxjava 有点似曾相似的感觉。
没错,各位看官,这次调用了handler.post(block)。所以我这次我真的下结论了,上篇文章是有点小微妙,但是这次应该没事清楚了。