SingleScheduler是RxJava2新增的Scheduler。SingleScheduler中有一个属性叫作executor,它是使用AtomicReference包装的ScheduledExecutorService。
补充:AtomicReference类的作用:AtomicReference则对应普通的对象引用,即保证你在修改对象引用时的线程安全性;对” 对象 ”进行原子操作
final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<ScheduledExecutorService>(); 复制代码
在SingleScheduler构造函数中,Executor会调用lazySet().
/** * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any * system properties for configuring new thread creation. Cannot be null. */ public SingleScheduler(ThreadFactory threadFactory) { this.threadFactory = threadFactory; executor.lazySet(createExecutor(threadFactory)); } 复制代码
************ 分割线 ************
其中**lazySet()**是 AtomicReference
中的方法,用于修改引用对象:
// AtomicRefence类 /** * Sets to the given value. * * @param newValue the new value */ public final void set(V newValue) { value = newValue; } /** * Eventually sets to the given value. * * @param newValue the new value * @since 1.6 */ public final void lazySet(V newValue) { U.putOrderedObject(this, VALUE, newValue); } 复制代码
AtomicReferences中set()和lazySet()区别: set()
会 立刻修改旧值
,别的线程可以立刻看到更新后的值;而 lazySet()
不会立刻(但是 最终会
)修改旧值,别的线程看到新值的时间会延迟一些。
************ 分割线 ************
它的createExecutor()用于创建工作线程,可以看到通过SchedulerPoolFactory来创建ScheduledExecutorService。
// SingleScheduler类 static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) { return SchedulerPoolFactory.create(threadFactory); } 复制代码
通过SchedulerPoolFactory类的create(ThreadFactory factory)来创建单线程的线程
// SchedulerPoolFactory类 /** * Creates a ScheduledExecutorService with the given factory. * @param factory the thread factory * @return the ScheduledExecutorService */ public static ScheduledExecutorService create(ThreadFactory factory) { // 创建单线程 final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory); if (exec instanceof ScheduledThreadPoolExecutor) { ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec; POOLS.put(e, exec); } return exec; } 复制代码
在SingleScheduler中,每次使用ScheduledExecutorService时,其实是使用executor.get()。所以说,single拥有一个线程单例。
SingleScheduler会创建一个ScheduledWorker,ScheduledWorker使用JDK的ScheduledExecutorService作为executor。 下面是ScheduledWorker的schedule()方法,使用ScheduledExecutorService的submit()或schedule()来执行runnable。
@NonNull @Override public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { if (disposed) { return EmptyDisposable.INSTANCE; } Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, tasks); tasks.add(sr); try { Future<?> f; if (delay <= 0L) { /** * 立即执行则执行submit()方法 * Submits a value-returning task for execution and returns a * Future representing the pending results of the task. The * Future's {@code get} method will return the task's result upon * successful completion. */ f = executor.submit((Callable<Object>)sr); } else { /** * 需延迟执行,则执行schedule()方法 * Creates and executes a ScheduledFuture that becomes enabled after the * given delay. */ f = executor.schedule((Callable<Object>)sr, delay, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { dispose(); RxJavaPlugins.onError(ex); return EmptyDisposable.INSTANCE; } return sr; } 复制代码
ComputationScheduler使用FixedSchedulerPool作为线程池,并且FixedSchedulerPool被AtomicReference包装了一下。
从ComputationScheduler的源码中可以看出, MAX_THREADS
是 CPU的数目
。 FixedSchedulerPool
可以理解为拥有 固定数量的线程池
(有点类似线程池中的FixedThreadPool),数量为MAX_THREADS。
static { MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0)); ... } static int cap(int cpuCount, int paramThreads) { return paramThreads <= 0 || paramThreads > cpuCount ? cpuCount : paramThreads; } 复制代码
ComputationScheduler类会创建一个EventLoopWorker。
@NonNull @Override public Worker createWorker() { return new EventLoopWorker(pool.get().getEventLoop()); } 复制代码
其中getEventLoop()是FixedSchedulerPool中的方法,返回了FixedSchedulerPool中的一个PoolWorker。
注: FixedSchedulerPool
和 EventLoopWorker
都为 ComputationScheduler
的内部类
// EventLoopWorker类中的方法 public PoolWorker getEventLoop() { int c = cores; if (c == 0) { return SHUTDOWN_WORKER; } // simple round robin, improvements to come return eventLoops[(int)(n++ % c)]; } 复制代码
PoolWorker继承自NewThreadWorker,也是线程数为1的ScheduledExecutorService。
IoScheduler使用CachedWorkerPool作为线程池,并且CacheWorkerPool也被AtomicReference包装了一下。 CachedWorkerPool是基于RxThreadFactory这个ThreadFactory来创建的。
static { ... WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority); ... NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY); ... } 复制代码
在RxThreadFactory中,由prefix和incrementAndGet()来创建新线程的名称
后续补充,先写到这儿