observable的.subscribeOn(Schedulers.io())方法是指定处理的事件流在哪个线程中执行
Schedulers:对外提供获取的方法
IOTask:统一task的创建方式
Scheduler:统一调用的api,IoScheduler/ComputationScheduler/NewThreadScheduler均为继承类。
Scheduler.Worker:管理具体执行的任务。
1,改用该方法后,创建了ObservableSubscribeOn对象,将上层的observable进行包装代理和传入的Scheduler进行保存。
@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); } 复制代码
2,然后会执行到ObservableSubscribeOn的subscribeActual方法,在(1)中的时候写过这个流程。在这个方法中,核心代码是第三句。
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } 复制代码
3,
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
在这行语句中 scheduler是处理runable的线程池,就是传过来的Schedulers.io()对象。 SubscribeTask是个Runnable
线程池执行runable的run方法。 source是上层的observable parent是对下层包装的observer
final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } public void run() { source.subscribe(parent); } } } 复制代码
最终的结果会调用到SubscribeOnObserver的onNext中。
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable { final Observer<? super T> actual; final AtomicReference<Disposable> s; SubscribeOnObserver(Observer<? super T> actual) { this.actual = actual; this.s = new AtomicReference<Disposable>(); } @Override public void onNext(T t) { actual.onNext(t); } } 复制代码
4,Schedulers.io()的准备工作
IO 的赋值的是在静态代码块中初始化。
public static Scheduler io() { return RxJavaPlugins.onIoScheduler(IO); } static { // 通过这个方法 调用 IOTask中的call 返回 new IoScheduler() 对象 IO = RxJavaPlugins.initIoScheduler(new IOTask()); } 复制代码
5,IoScheduler继承Scheduler。并且实现了Scheduler的抽象方法createWorker。
@NonNull public Disposable scheduleDirect(@NonNull Runnable run) { return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); } @NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; } 复制代码
6,在 IoScheduler的createWorker方法中创建了EventLoopWorker对象。所以在w.schedule是调用的EventLoopWorker中的schedule方法。
@Override public Worker createWorker() { return new EventLoopWorker(pool.get()); } 复制代码
static final class EventLoopWorker extends Scheduler.Worker { private final CompositeDisposable tasks; private final CachedWorkerPool pool; private final ThreadWorker threadWorker; EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; this.tasks = new CompositeDisposable(); this.threadWorker = pool.get(); } @Override public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); } } 复制代码
7,ThreadWorker线程工作类,继承NewThreadWorker,scheduleActual在NewThreadWorker中。
static final class ThreadWorker extends NewThreadWorker { private long expirationTime; ThreadWorker(ThreadFactory threadFactory) { super(threadFactory); this.expirationTime = 0L; } public long getExpirationTime() { return expirationTime; } public void setExpirationTime(long expirationTime) { this.expirationTime = expirationTime; } } 复制代码
8,在这里看到executor.schedule,调用线程池执行runable的地方。runable方法一直传递到这里。
@NonNull public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); if (parent != null) { if (!parent.add(sr)) { return sr; } } Future<?> f; try { if (delayTime <= 0) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; } 复制代码
类图