先贴上这个系列的链接。
rxjava2源码解析(一)基本流程分析
rxjava2源码解析(二)线程切换分析
上一篇说了 rxjava2
的线程切换,但是没有深入说其中的线程池。这篇我们来深扒一下。
还是先说 observeOn
,直接看源码:
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; } @Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } } 复制代码
这段代码我们上篇看到过,这里再重复一下。 obsererOn
是切换下游观察者线程,我们看 ObserveOnObserver
中的 onNext
方法是如何切换线程的。
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { this.downstream = actual; this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } void schedule() { if (getAndIncrement() == 0) { //ObserveOnObserver继承了runnable接口,意味着可以当做是线程任务来执行。这里代表着在新线程中执行run方法。 worker.schedule(this); } } //ObserveOnObserver继承了runnable接口 @Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } } void drainNormal() { int missed = 1; final SimpleQueue<T> q = queue; final Observer<? super T> a = downstream; for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { return; } for (;;) { ····//省略一些判断的代码 v = q.poll(); //这里就可以看到,将下游的onNext方法,切换到新线程执行。 a.onNext(v); } ··· } } } 复制代码
这是上游的处理器执行 onNext
,传到这里,使用之前设置的线程执行下游的 onNext
方法。
这个 worker
到底是什么?我们先看 schedule
r的 createWorker
方法:
public abstract Worker createWorker(); 复制代码
在 Scheduler
类中, createWorker
只是一个接口,子类会重写这个方法,我们就以 Schedulers.newThread()
这个方法创建的 Scheduler
为例,来看看这里面的原理。
//Schedulers类中的newThread静态方法,这里的hock我们暂且不理,直接返回NEW_THREAD public static Scheduler newThread() { return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD); } //Schedulers类中定义了NEW_THREAD和其他THREAD static { SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask()); COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask()); IO = RxJavaPlugins.initIoScheduler(new IOTask()); TRAMPOLINE = TrampolineScheduler.instance(); NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask()); } //NewThreadTask是Schedulers的静态内部类,继承自Callable接口,其中call方法返回一个Scheduler static final class NewThreadTask implements Callable<Scheduler> { @Override public Scheduler call() throws Exception { return NewThreadHolder.DEFAULT; } } //NewThreadHolder同样是一个静态内部类,里面只有一个静态参数DEFAULT,这里我们就找到了newThread方法返回的本尊NewThreadScheduler static final class NewThreadHolder { static final Scheduler DEFAULT = new NewThreadScheduler(); } 复制代码
如上面代码和注释所示,我们直接看 NewThreadScheduler
的源码:
/** * Schedules work on a new thread. */ public final class NewThreadScheduler extends Scheduler { final ThreadFactory threadFactory; private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler"; private static final RxThreadFactory THREAD_FACTORY; /** The name of the system property for setting the thread priority for this Scheduler. */ private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority"; static { //这里Thread.MIN_PRIORITY为1,Thread.MAX_PRIORITY为10.Thread.NORM_PRIORITY为5.如果我们不做任何更改,这里的priority的值就为5. int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY))); THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority); } public NewThreadScheduler() { this(THREAD_FACTORY); } public NewThreadScheduler(ThreadFactory threadFactory) { this.threadFactory = threadFactory; } @NonNull @Override public Worker createWorker() { return new NewThreadWorker(threadFactory); } } 复制代码
这里 createWorker
方法返回的是一个 NewThreadWorker
对象。我们总算找到了 worker
的来源,需要注意这里的构造参数是 threadFactory
。来看看 NewThreadWorker
的源码。
public class NewThreadWorker extends Scheduler.Worker implements Disposable { private final ScheduledExecutorService executor; volatile boolean disposed; public NewThreadWorker(ThreadFactory threadFactory) { executor = SchedulerPoolFactory.create(threadFactory); } @NonNull @Override public Disposable schedule(@NonNull final Runnable run) { return schedule(run, 0, null); } @NonNull @Override public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) { if (disposed) { return EmptyDisposable.INSTANCE; } return scheduleActual(action, delayTime, unit, null); } @NonNull public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { //hock机制 Runnable decoratedRun = RxJavaPlugins.onSchedule(run); //用一个ScheduledRunnable把传入的runnable包装一下,本质上没区别。 ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); ....//省略判断性代码 Future<?> f; try { if (delayTime <= 0) { //executor由构造方法中创建 f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; } .... } 复制代码
这里我们就可以看到,前面调用 worker.schedule(this)
,最终走到了 executor.submit(sr)
。这里的 sr
只是前面 ObserveOnObserver
的包装。 executor
在构造方法中创建。来看看 executor
是什么:
//SchedulerPoolFactory类中的静态方法 public static ScheduledExecutorService create(ThreadFactory factory) { final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory); tryPutIntoPool(PURGE_ENABLED, exec); return exec; } 复制代码
//Executors类的静态方法 public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); } 复制代码
OK, executor
是一个 ScheduledThreadPoolExecutor
,标准的工作线程池。核心线程数为1, threadFactory
是前面 NewThreadWorker
构造参数中的 RxThreadFactory
。他会给 thread
按照命名格式进行命名。
public final class RxThreadFactory extends AtomicLong implements ThreadFactory { public RxThreadFactory(String prefix) { this(prefix, Thread.NORM_PRIORITY, false); } public RxThreadFactory(String prefix, int priority) { this(prefix, priority, false); } public RxThreadFactory(String prefix, int priority, boolean nonBlocking) { this.prefix = prefix; this.priority = priority; this.nonBlocking = nonBlocking; } @Override public Thread newThread(Runnable r) { StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet()); String name = nameBuilder.toString(); Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name); t.setPriority(priority); t.setDaemon(true); return t; } ... } 复制代码
observeOn
在 subscribe
方法中,新建一个 worker
对象。这个 worker
对象是根据设置的 scheduler
创建的。然后在新建一个 ObserveOnObserver
对象,将上游与之订阅。 ObserveOnObserver
的 onNext
方法中,会调用 worker.schedule(this)
,将本身作为 runnable
传入到 worker
中。 newThreadScheduler
为例,他创建的 worker
是一个 NewThreadWorker
实例。在实例构造方法中,会根据传入的 threadFactory
新建一个 ScheduledThreadPool
线程池。 NewThreadWorker
的 shedule
方法,就是将 ObserveOnObserver
作为一个 runnable
放在一个新的线程池中执行。 ObserveOnObserver
的 run
方法,就是用来执行下游的 onNext
,将数据传输下去。从而达到了,切换下游 onNext
线程的目的。
subscribeOn
是用来切换上游发射器线程。切换原理上一篇有说过,其中线程池相关跟上面 observeOn
差不多,这里就不赘述了。
上面就是 rxjava2
线程切换原理分析了,后面再有人面试问你 rxjava2
里面的线程池是哪一种,你就可以自信的说出: ScheduledThreadPool
。