线程池是一种线程使用的模式。它通过一个监控管理器管理任务与线程之间的关系,避免了频繁创建和销毁线程带来的代价,同时还限制了线程和任务的数量,避免了因为内存溢出、线程切换、任务过多等情况系统负载过重的情况。
任务的调度通常使用同步队列,主线程将任务放入队列中,其他工作线程则从队列中获取任务来执行,如果没有任务可以执行,则工作线程将会挂起。
线程池使用了一种生产者-消费者模式,分为同步层、队列层、异步层三层。同步层的主线程负责处理工作任务并将任务放入队列层的同步队列中,异步层的工作线程则负责从队列层的同步队列中获取任务进行处理,如果任务列表为空,那么工作线程进入挂起状态。
线程池并不是没有缺点的,我们只有合理的设置线程池的参数才能避免在提高性能的同时浪费过多的资源。
线程池的整体结构并不复杂,主要接口是 Executor
和 ExecutorService
。
Executor
接口定义了一个用于向线程池提交任务的方法,也是该接口唯一一个方法。该方法对任务提交和任务执行进行了解耦,同时屏蔽了线程调度和任务执行的具体细节,让用户只需要关注如何提交任务即可。
ExecutorService
接口扩展了线程池的功能:(1)提供了其他的提交任务的方法,用于返回一个或者一批可以有返回值的 Future
对象。(2)提供了停止线程池运作的接口。
AbstractExecutorService
上层的抽象类,提供了基本的执行流程,让下层类只需要关注一个任务的执行流程,无需关注任务之间的关系。
ThreadPoolExecutor
具体的实现类,也是最复杂的部分,维护线程池的生命周期、任务的存储、任务的调度等功能。也是我们重点关注的类。
ctl
是一个 AtomicInteger
类型的变量,用于存储线程池的状态和工作线程的数量。其中高3位存储线程池的状态(runState),低29位存储工作线程的数量(workerCount)。
线程池生命周期:
workQueue
是一个 BlockingQueue
类型的变量,用于存储暂时无法执行的任务。它是一个同步队列,一次只有一个线程可以从队列中获得任务,如果队列中没有任务可以获取,那么线程将会被阻塞挂起,等待通知。
workers
是一个 HashSet
类型的变量,用于存储目前线程池中的所有可用线程。并且通过该容器持有了线程的引用,可以避免线程被垃圾回收。如果一个线程不需要被使用了,则直接将线程从该容器中移除即可,然后该线程就会被垃圾回收器回收。
threadFactory
是一个 ThreadFactory
类型的变量,用于创建线程的工厂。线程池中有一个默认的线程池工厂,我们也可以通过实现 ThreadFactory
接口来自定义如果生成一个线程。
handler
是一个 RejectedExecutionHandler
类型的变量,定义了需要拒绝一个任务时,应该做出如何的响应。当任务队列容量已满,或者线程池不是Running状态时,将会使用定义好的拒绝策略来拒绝任务。线程池中默认实现了四种拒绝策略,我们也可以通过实现 RejectedExecutionHandler
接口自定义拒绝任务时的策略。
预设的四种策略:
CallerRunsPolicy AbortPolicy DiscardPolicy DiscardOldestPolicy
keepAliveTime
是一个 long
类型的变量,定义了非核心的空闲线程等待新任务的时间,如果超过了该时间依然没有新任务分配给该线程,那么将会回收该线程。如果没有使用该参数,那么线程将永远等待新任务。
allowCoreThreadTimeOut
是一个 boolean
类型的变量,用于开启核心线程空闲时,是否具有超时回收的功能。如果为 true
,那么空闲的核心线程在超过 keepAliveTime
设定的时间后也会被回收。如果会 false
,那么将永远不会被回收,直到线程池停止。
corePoolSize
是一个 int
类型的变量,用于表示预设的核心线程数量,在创建线程池的时候需要指定该参数的值。
maximumPoolSize
是一个 int
类型的变量,用于表示预设的线程数量,在创建线程池的时候需要指定该参数的值,它与 corePoolSize
的区别在于, corePoolSize
只是核心线程的数量,而其是核心线程+非核心线程的数量。
线程池中默认的拒绝策略,默认的拒绝策略为 AbortPolicy
,即抛出异常。
线程池中任务的提交与任务的执行是解耦的,用户只需要关心任务的提交即可,具体任务的执行将会由线程池自行决定以何种方式完成:直接创建线程执行任务、放入阻塞队列、拒绝任务。
任务的提交是由 execute
方法来完成的,该方法的流程如下图所示:
任务的缓冲是通过阻塞队列来实现的,线程获取任务的途径有两种:(1) 创建线程时传入的首个任务。(2)从阻塞队列中获取任务。
当线程从阻塞队列中获取任务时,如果队列为空,那么该线程将会被挂起,等待有新的任务时才会被唤醒去获取新的任务。阻塞队列是一种同步队列,使用了独占锁保证了一次只有一个线程可以从队列中获取任务,避免不同的线程获取同一个任务。
Java中提供了多种阻塞队列进行选择,每一种阻塞队列都具有各自的特点。
名称 | 描述 |
---|---|
ArrayBlockingQueue | 一种使用数组实现的先进先出的有界阻塞队列。支持公平锁和非公平锁 |
LinkedBlockingQueue | 一种使用链表实现的先进先出的有界阻塞队列。默认的容量是 Interge.MAX_VALUE ,相比于 ArrayBlockingQueue 具有更高的吞吐量,但是却丢失了随机存储的特性。 |
LinkedBlockingDeque | 一种使用链表实现的具有双向存取功能的有界阻塞队列。在高并发下,相比于 LinkedBlockingQueue 可以将锁竞争降低最多一半 |
PriorityBlockingQueue | 一种提供了优先级排序的无界阻塞队列。如果没有提供具体的排列方法,那么将会使用自然排序进行排序,会抛出 OOM 异常。 |
SynchronousQueue | 一种不存储任务的同步队列。每一次的插入操作都必须等待其他线程进行相应的删除操作。支持公平锁和非公平锁 |
LinkedTransferQueue | 一种使用链表实现的无界阻塞队列。 |
DelayQueue | 一种无界的延时队列,可以设置每个元素需要等待多久才能从队列中取出。内部使用了领导者模式来对元素进行管理。 |
线程获取任务的途径有两种,一种是创建线程时分配的首次任务,另一种从阻塞队列中获取任务。线程执行完任务之后就会去阻塞队列中重写获取任务,这是通过一个 while
循环来进行的。
当创建并启动一个线程后,就会执行 runWorker
方法,该方法会通过一个 while
循环获取需要执行的任务,当获取到任务的时候就会调用任务的 run
方法,从而执行获取的任务。当执行完任务时,又会再次进入 while
循环,直到线程池被停止、或者线程被阻塞、或者线程需要被回收。
任务的申请是通过 getTask
方法来完成的,执行流程如下:
当线程获取任务返回null时,线程池就会将该线程回收。
回收线程的情况:(1)线程池不是RUNNING状态。(2)判断线程是否为可回收的线程,非核心线程则为可回收的线程,如果设置了 allowCoreThreadTimeOut
,那么核心线程也是可回收的,如果可回收线程在规定时间内没有获取到可执行的任务,那么就会在下一轮循环中将其回收。
任务的拒绝是为了保证线程池的安全,如果阻塞队列已满,并且线程数量也达到了预设的 maximumPoolSize
时,那么就会直接拒绝该任务。
拒绝策略是通过一个 RejectedExecutionHandler
来实现的,我们可以通过实现该接口,自定义一个任务的拒绝策略。
线程池内部也提供了四种可选的任务拒绝策略,默认的任务拒绝策略是 AbortPolicy
,即拒绝任务时会抛出一个异常。
名称 | 描述 |
---|---|
CallerRunsPolicy | 使用执行 execute 方法的线程执行该被拒绝的任务,如果线程池已经被关闭,则丢弃这个任务 |
AbortPolicy | 默认的拒绝策略,直接抛出异常 |
DiscardPolicy | 什么都不做,直接丢弃该任务,不会有任何记录和提示 |
DiscardOldestPolicy | 丢弃任务队列中最老的任务,然后将该任务重新进行提交 |
在线程池中,设计了一个 Worker
内部类来管理、监控线程的状态。
Worker
类继承自 AQS
,同时实现了 Runnable
接口。从设计上可以看出,该类的对象即是一个锁,又是一个存有线程的任务。
创建线程的方法是 addWorker(Runnable firstTask, boolean core)
,其中firstTask表示是否具有一个需要立即执行的首个任务,core表示创建的线程是否为核心线程。
该方法的流程:
创建完线程之后,因为线程是需要复用的,并不是执行完一次任务就销毁,所以需要将创建好的线程进行强引用,使垃圾收集器器无法对其进行回收。
线程池中是通过一个 HashSet
容器将所有的有效线程保存起来,如果需要回收线程,则只需要将该线程从容器中移除即可。将线程的引用移除,垃圾收集器只要发现其不存在任何一个引用就会将其回收。
线程池中的线程并不是永久存在的,那么线程池就需要将没用的线程进行回收。通过上一节我们知道只需要解除引用即可将线程回收,但是线程池是如何判断哪些线程是需要回收的呢?
其实是在线程获取任务之时来判断的,如果在获取任务是,返回的是一个null,那么则说明这个线程是需要被回收的。
// runWorker() try { while (task != null || (task = getTask()) != null) { // ....... } } finally { processWorkerExit(w, completedAbruptly); } 复制代码
线程启动之后,就会执行该方法,会循环的使用 getTask()
方法从任务队列中获取任务。获取任务时会出现三种情况:
当返回null的时候,跳出 while
循环,然后就会执行 processWorkerExit
方法,该方法会导致线程丢失引用,从而被垃圾收集器回收。
在Java的线程模型中,并不存在真正的中断。而是设置一个中断标志位,由用户自行决定如何响应中断标志位的变化。
当我们执行 shutdowmNow
方法时,会修改线程池的状态为STOP,中断所有的线程,包括空闲线程和运行中的线程,并清空任务队列,最后会尝试修改线程池的状态为TERMINATED。
线程池中的变量的值并不是初始化完成后就固定的,线程池中提供了相关的接口可用于线程池在运行中修改它的部分参数。这里只例举出两种比较重要的。
该方法可以动态设置核心线程的数量。线程池会根据线程当前的状态和该方法传入的值来调整线程池中的线程的数量。
如果新核心线程数大于旧核心线程数,线程池并不会直接添加对应差值的线程数。而是根据差值和阻塞的任务数量来决定需要创建的核心线程数量,如果阻塞队列中没有需要执行的任务,那么将不会提前创建新的线程。每次创建一个线程都会检测阻塞队列中是否还存在任务,如果不存在则不再继续创建线程。