Scheduler 与 Worker 在 RxJava2 中是一个非常重要的概念,他们是 RxJava 线程调度的核心与基石。Scheduler主要负责的就是一件事情,定义好每个流模块的执行线程。
@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); } 复制代码
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } } 复制代码
ObservableSubscribeOn的subscribeActual方法执行之后,scheduler.scheduleDirect(new SubscribeTask(parent))通过这段代码将当前流运行到Scheduler的线程内。
@Overridepublic Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { if (run == null) throw new NullPointerException("run == null"); if (unit == null) throw new NullPointerException("unit == null"); run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); handler.postDelayed(scheduled, unit.toMillis(delay)); return scheduled; } 复制代码
static final class EventLoopWorker extends Scheduler.Worker { private final CompositeDisposable tasks; private final CachedWorkerPool pool; private final ThreadWorker threadWorker; final AtomicBoolean once = new AtomicBoolean(); EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; this.tasks = new CompositeDisposable(); this.threadWorker = pool.get(); } @Override public void dispose() { if (once.compareAndSet(false, true)) { tasks.dispose(); // releasing the pool should be the last action pool.release(threadWorker); } } @Override public boolean isDisposed() { return once.get(); } @NonNull @Override public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); } } 复制代码
协程上下文(coroutine context)包含一个协程调度器(参阅 CoroutineDispatcher),协程调度器 用于确定执行协程的目标载体,即运行于哪个线程,包含一个还是多个线程。协程调度器可以将协程的执行操作限制在特定线程上,也可以将其分派到线程池中,或者让它无限制地运行。
public suspend fun <T> withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T ): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont -> // compute new context val oldContext = uCont.context val newContext = oldContext + context // always check for cancellation of new context newContext.checkCompletion() // FAST PATH #1 -- new context is the same as the old one if (newContext === oldContext) { val coroutine = ScopeCoroutine(newContext, uCont) return@sc coroutine.startUndispatchedOrReturn(coroutine, block) } // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed) // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher) if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) { val coroutine = UndispatchedCoroutine(newContext, uCont) // There are changes in the context, so this thread needs to be updated withCoroutineContext(newContext, null) { return@sc coroutine.startUndispatchedOrReturn(coroutine, block) } } // SLOW PATH -- use new dispatcher val coroutine = DispatchedCoroutine(newContext, uCont) coroutine.initParentJob() block.startCoroutineCancellable(coroutine, coroutine) coroutine.getResult() } 复制代码
coroutine.initParentJob()初始化父任务,这个方法需要一开始就被初始化调用的。 然后就是核心关键将block.startCoroutineCancellable(coroutine, coroutine),该方法会创建一个新的可挂起线程。 进行异步等待操作,当有值的情况下会回调将当前挂起结束,进行下一步获取值操作,然后将当前的线程返回。
@JvmStatic public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher 复制代码
internal class AndroidDispatcherFactory : MainDispatcherFactory { override fun createDispatcher(allFactories: List<MainDispatcherFactory>) = HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main") override fun hintOnError(): String? = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used" override val loadPriority: Int get() = Int.MAX_VALUE / 2 } 复制代码
internal class HandlerContext private constructor( private val handler: Handler, private val name: String?, private val invokeImmediately: Boolean ) : HandlerDispatcher(), Delay { /** * Creates [CoroutineDispatcher] for the given Android [handler]. * * @param handler a handler. * @param name an optional name for debugging. */ public constructor( handler: Handler, name: String? = null ) : this(handler, name, false) @Volatile private var _immediate: HandlerContext? = if (invokeImmediately) this else null override val immediate: HandlerContext = _immediate ?: HandlerContext(handler, name, true).also { _immediate = it } override fun isDispatchNeeded(context: CoroutineContext): Boolean { return !invokeImmediately || Looper.myLooper() != handler.looper } override fun dispatch(context: CoroutineContext, block: Runnable) { } override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { val block = Runnable { with(continuation) { resumeUndispatched(Unit) } } handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY)) continuation.invokeOnCancellation { handler.removeCallbacks(block) } } override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY)) return object : DisposableHandle { override fun dispose() { handler.removeCallbacks(block) } } } override fun toString(): String = if (name != null) { if (invokeImmediately) "$name [immediate]" else name } else { handler.toString() } override fun equals(other: Any?): Boolean = other is HandlerContext && other.handler === handler override fun hashCode(): Int = System.identityHashCode(handler) } 复制代码
DispatchedCoroutine在上面的这个挂起函数的父类CoroutineDispatcher,会调用dispatch方法,进行线程切换操作,然后是不是和上面的rxjava 有点似曾相似的感觉。