随着项目业务的快速扩张,你是否已经注意到很多单独的线程游离在各个模块中,一旦想做线程方面的监控与优化,代码将需要大动干戈。
相信你一定用过rxjava、okHttp这些流行的框架,它们内部都涉及线程的调度,且封装好一系列的API供你使用,你甚至完全不必关心这些线程是如何工作的。如果单独使用它们都没问题,可是如果你从项目架构的角度考虑是否应该重新考量如何使用它们。
线程池的创建使用可通过java并发包中的Executors类完成,它提供了创建线程池的常用方法。
后面会陆续介绍它们,我们先来看一个例子。
public void main() { ExecutorService executorService = Executors.newFixedThreadPool(3); for(int i = 0; i < 20; i++) { executorService.execute(new MyRunnable(i)); } } static class MyRunnable implements Runnable { int id; MyRunnable(int id) { this.id = id; } @Override public void run() { try { Thread.sleep(3000); Log.i("threadpool", "task id:"+id+" is running threadInfo:"+Thread.currentThread().toString()); } catch (InterruptedException e) { e.printStackTrace(); } } } 复制代码
示例中创建了一个固定线程数的线程池,并向其中添加20个任务。
通过log打印可以看到,日志一次打印三条,每3秒打印一次,所有任务都在名为pool-1-thread-1,pool-1-thread-2,pool-1-thread-3的线程中运行,这与我们为线程池设置的大小相吻合。导致这种现象的原因是线程池中只有三个线程,当一次性将20个任务加入到线程池中时,前三个任务优先执行,后面的任务都在等待。
而如果我们把 ExecutorService executorService = Executors.newFixedThreadPool(3);
换为 ExecutorService executorService = Executors.newCachedThreadPool();
再来看一下效果。
一瞬间任务都执行完了,可以预见使用newCachedThreadPool方式创建的线程池,执行任务时会创建足够多的线程。
接下来,我们正式来看线程池的内部的工作原理,我们分为三个部分来讲解。
常见的线程池种类上面已经提及,我们来看一下它们是如何创建出来的,我们举两个栗子。
# -> Executor.newFixedThreadPool public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } # -> Executor.newSingleThreadExecutor public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } 复制代码
可见线程池的创建都是通过ThreadPoolExecutor完成的,来看一下它的构造方法。
# -> ThreadPoolExecutor构造方法 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } 复制代码
构造方法声明的一系列参数非常重要,理解了它们线程池的基本原理你就掌握了,我们来看看他们的具体含义:
线程池创建完成后,可通过execute方法提交任务,线程池根据当前运行状态和特定参数对任务进处理,整体模型如下图:
下图清晰的展示了线程池提交任务后的流程,不再赘述。
接下来我们看看Executors工具类中常见的几种线程池使用的构造参数是怎样的。
线程池类型 | 核心线程数 | 最大线程数 | 非核心线程空闲时间 | 工作队列 |
---|---|---|---|---|
newFixedThreadPool | specific | specific | 0 | LinkedBlockingQueue |
newSingleThreadExecutor | 1 | 1 | 0 | LinkedBlockingQueue |
newCachedThreadPool | 0 | Integer.MAX_VALUE | 60s | SynchronousQueue |
newScheduledThreadPool | specific | Integer.MAX_VALUE | 0 | DelayedWorkQueue |
其中specific指的是需要使用者传入固定值。
这里需要先对阻塞队列进行额外的分析。
你有没有想过为什么要用阻塞队列,非阻塞的不行吗?
实际上阻塞队列常用于生产者-消费者模型,任务的添加是生产者,任务的调度执行是消费者,他们通常在不同的线程中,如果使用非阻塞队列,那势必需要额外的处理同步策略和线程间唤醒策略。比如当任务队列为空时,消费者线程取元素时会被阻塞,当有新的任务添加到队列中时需唤醒消费者线程处理任务。
阻塞队列的实现就是在添加元素和获取元素时设置了各种锁操作(Lock+Condition)。
另一个需要关注的是阻塞队列的容量问题,因为根据线程池处理流程图,阻塞队列容量的大小直接影响非核心线程的创建。具体来说,当阻塞队列未满时并不会创建非核心线程,而是将任务继续添加到阻塞队列后面等待核心线程(如果有)执行。
Integer.MAX_VALUE
应根据实际需求选择合适的阻塞队列,现在我们再来看这些线程池的使用场景。
在实际开发过程中不建议直接使用Executors提供的方法,如果任务规模、响应时间大致确定,应根据实际需求通过ThreadPoolExecutor各种构造函数手动创建,还自由可控制线程数、超时时间、阻塞队列、饱和策略(默认的饱和策略都是AbortPolicy即抛出异常)。
内置的饱和策略有如下四种
用户也可通过实现RejectedExecutionHandler接口自定义饱和策略,并通过ThreadPoolExecutor多参的构造函数传入。
接下来我们有必要了解一下线程池的继承结构。
了解了线程池的内部结构,在实战中我们应该如何选取线程池的大小呢?
这需要大致了解任务是CPU密集型还是IO密集型。
最佳线程数 = CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]
以单核、CPU计算和I/O操作的耗时是1:2为例,可以看到三个线程可使CPU利用率达到100%(本例来自 极客时间 --Java并发编程实战 )。
线程池的状态在整个任务处理过程中至关重要,比如添加任务时会先判断线程池是否处于运行状态,任务添加到队列后再判断运行状态,如果此时线程池已经关闭则移除任务并执行饱和策略。
我们接下来看看线程池的几种状态:
一图说明状态流转
回到文章开始的问题:RxJava和OkHttp采用哪种线程池进行调度的呢?
Schedulers.io()
和 Schedulers.computation()
分别对应了IO密集型和CPU密集型调度器,内部均使用的是 ScheduledThreadPoolExecutor
线程池,这是出于链式调用中可能存在delay等延时操作的考量而设计的。二者不同点在于最大线程数不同,对computation来说最大线程数为CPU核心数,而对io来说最大线程数为无限。 newCachedThreadPool
,上面介绍过此线程池的弊端,这可能是出于高并发量的考量而做的选择,在实际使用中可根据实际情况灵活配置。 # -> Dispatcher public Dispatcher() { } public Dispatcher(ExecutorService executorService) { this.executorService = executorService; } public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; } 复制代码