记录一下有关Java线程池的学习笔记,主要内容来自于《Java并发编程的艺术》,增加了一些自己的理解和实际问题中的处理。
线程同样是一个对象,对象的创建和销毁都需要消耗系统资源(类加载、垃圾回收)。频繁地创建线程会 消耗系统资源,降低系统稳定性 。 使用线程池可以对线程进行统一分配、调优和监控。
线程池的调度流程主要涉及三个概念: 核心线程池
、 任务队列
和 最大线程池
,这三个概念分别对应线程池构造参数中的 corePoolSize
、 workQueue
和 maximumPoolSize
. 结合下图,说明提交一个新任务到线程池时,线程池的处理流程:
判断线程池中线程数目是否达到 核心线程大小
,未达到则 创建新线程 执行任务;达到,则转(2);
说明:
判断 队列是否已满
,未满则将任务 放入队列 ;满,则转(3);
判断线程池中线程数目是否达到 最大线程池数目
,未达到 创建新线程 来处理任务;达到,则采取对应的 任务拒绝策略 。
Jdk中定义了任务执行接口ExecutorService,并提供了实现类ThreadPoolExecutor。创建示例:
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1); ExecutorService executorService = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS, queue, new ThreadFactoryBuilder().build(), new ThreadPoolExecutor.CallerRunsPolicy()); 复制代码
execute
用于提交无返回值的任务, submit
用于提交有返回值的任务(与Callable、Future结合)。 示例:
ExecutorService executorService = Executors.newFixedThreadPool(3); executorService.execute(() -> { System.out.println("HelloWorld1"); }); Future<String> future = executorService.submit(() -> { return "HelloWorld2"; }); try { System.out.println(future.get()); } catch (InterruptedException | ExecutionException ex) { ex.printStackTrace(); } 复制代码
可以使用 shutdown
和 shutdownNow
来关闭线程池。
实际测试:
ThreadPoolExecutor还提供了多种API对线程池的状态进行监控,需要注意的是这些API都需要加锁。常见API有:
通过 ThreadPoolExecutor
类可以创建一个线程池,作为 ExecutorService
接口的实现类。
ThreadPoolExecutor类提供了多重构造器方法,其中参数最齐全的构造器如下,参数包括 corePoolSize
(核心线程池大小), maximumPoolSize
(线程池最大线程数), keepAliveTime
(空闲线程存活时间), unit
(空闲线程池存活时间单位), workQueue
(线程池任务队列), threadFactory
(线程创建工厂), handler
(任务拒绝策略)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 复制代码
线程池的第一阶段可以理解为预热阶段,就是要 先将核心线程池装满 (不论线程池中是否有空闲线程,且核心线程数目为0时等价于为1)。
如果执行prestartAllCoreThreads()方法,会直接创建并启动所有核心线程。
((ThreadPoolExecutor) executorService).prestartAllCoreThreads(); 复制代码
核心线程数目不宜太小,比如核心线程池为1,那么假若线程执行时里有休眠或者写redis超时等,这会使得后续所有任务都处于等待状态(工作队列较大的情况下),可以考虑将核心线程池数目设为 CPU核数 ,这样保证预热阶段就可以每个核运行一个线程。(任务映射到线程,线程映射到CPU)
工作队列是用于存放等待执行任务的阻塞队列,可以根据实际场景来决策选择什么样的队列。
阻塞队列
是指,当队列满时往队列中放元素的线程会阻塞;当队列空时从队列中取元素的线程会阻塞。主要有如下集中可选的队列:
5.2.2
工作队列应尽量使用 有界队列 ,无界队列一是容易导致内存泄露,二是最大线程池参数无效。
线程池最大线程数,只有当工作队列有界且容量有限的情况下,该参数才有意义。如果队列很大,会导致任务一直被放入队列中,而不会创建新的线程去执行。
首先最大线程池数目只有在工作队列有界时才有效,另外最大线程池数目设置的意义在于充分利用CPU资源、避免任务等待时间过长。
用于指定创建线程的工厂,比如可以使用可以google guaua包中提供的ThreadFactoryBuilder来快速的给线程池中线程设置自定义的线程名,一般使用默认的即可。
new ThreadFactoryBuilder().build() 复制代码
当线程池经过了第三阶段,即线程数目已经达到最大线程数目,那样的话任务将被拒绝添加到线程池中,根据拒绝策略执行任务。
常见拒绝策略有:
CallerRunsPolicy
个人觉得如果不是特别占CPU资源的任务,使用CallerRunsPolicy策略比较合适,能保证任务被执行。
当线程池中线程处于空闲状态超过一定时间时会销毁该线程,一般常见设置是60s存活时间。
工厂类Executors提供了多个用于创建典型线程池的API,接下来详细介绍几种典型线程池。
Executors提供了两种FixedThreadPool创建API,分别是
public static ExecutorService newFixedThreadPool(int nThreads); public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) ; 复制代码
第一种API源码如下
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } 复制代码
观察源码可以分析,FixedThreadPool有如下特点:
FixedThreadPool最大的特点是限制了线程池最大线程数目,比较适用于任务 对系统资源消耗较大、负载比较重 的服务器。
Executors工厂类提供了两种CachedThreadPool的创建API:
public static ExecutorService newCachedThreadPool(); public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory); 复制代码
第一种API的源码如下:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 复制代码
SynchronousQueue被称为 无缓冲阻塞队列
,用于在两个线程间移交元素。
经过测试SynchronousQueue有如下特点:
观察源码可以分析得到,CachedThreadPool具有以下特点:
根据CachedThreadPool特点可以分析出,其适用于处理大量 短期任务 、或者负载较轻的服务器。(主要是不需要限制线程数目的场景)
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } 复制代码
可以看出在ThreadPoolExecutor又包装了一层FinalizableDelegatedExecutorService。
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown(); } } 复制代码
FinalizableDelegatedExecutorService又如下特点:
static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List<Runnable> shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return e.awaitTermination(timeout, unit); } public Future<?> submit(Runnable task) { return e.submit(task); } public <T> Future<T> submit(Callable<T> task) { return e.submit(task); } public <T> Future<T> submit(Runnable task, T result) { return e.submit(task, result); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { return e.invokeAll(tasks); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { return e.invokeAll(tasks, timeout, unit); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { return e.invokeAny(tasks); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return e.invokeAny(tasks, timeout, unit); } } 复制代码
可以看出DelegatedExecutorService:
结合FinalizableDelegatedExecutorService和内部ThreadPoolExecutor可以得到SingleThreadExecutor具备以下特点:
SingleThreadExecutor适用于需要顺序执行任务的场景,但对应地并发量会降低、QPS也会降低。
配置线程池大小主要需要考虑 任务类型
、 任务执行时间
、 机器负载
。
根据任务消耗资源可以将任务分为 CPU密集型任务
、 IO密集型任务
、 混合型任务
。
常见的代码处理(计算、逻辑判断等)任务都属于CPU密集型任务,而文件读写(打印日志)、网络占用等属于IO密集型任务。
CPU密集型任务
。对于CPU密集型任务,建议 降低最大线程数、使之小于CPU核数 ,线程过多一是频繁在线程间切换上下文会带来额外开销,而是容易使CPU满负荷运行。 IO密集型任务
。对于IO密集型任务,建议 提高线程数(比如2*CPU核数) ,因为活跃线程往往因为正在读写IO而没有持有CPU,此时可充分利用CPU处理其它线程。 混合型任务
。根据任务占用CPU和IO时间,看能否将任务分解,当任务占用CPU和IO时间相差不大时,分解并发度提升较大。 一般情况下,一类任务使用一个线程池,这样可以使任务间在获取CPU时更加公平,避免较短任务等待时间过长。
需要注意的是,影响 任务执行时间
除了任务本身外,往往还需特别关注 休眠操作
、 任务依赖性
。
长任务
。任务执行时间较长时,建议 一定程度增大最大线程数目 ,来 增加并发能力(线程数小于CPU核数情况下),同时可以减少后面任务的等待时间(但是会由于CPU在线程间切换而增加一定额外开销) 。 短任务
。任务执行时间较短时,建议 降低线程数 。因为执行较快,少量线程就可以胜任任务,并且当任务突增时,由于线程数较少可以 避免CPU被打满 。 任务依赖性
。任务中如果有 访问redis、数据库等操作 ,需要额外注意, 对其他模块或组件的依赖将可能导致任务执行时间突增 。(1)首先 避免核心线程池设为0或1 ,假如核心线程只有1个,一个任务超时,将导致后续所有任务都延迟执行;(2)根据实际情况,可 适度增大最大线程池 。 休眠操作
。任务内如果在某些情况下需要进行休眠的话(非常不建议使用休眠,如必要,建议间歇性休眠并设置最大休眠时间),也会导致任务执行时间增加。与上一点相同,注意核心线程池大小。 在负载较重的机器,通过限制线程池数目来降低机器压力。影响 机器负载
的因素一般是 机器配置
,如果机器有多个服务混布,也会导致机器压力较大。
机器配置 服务混布 线程池的种类和数目
问题:当时由于对线程池参数理解不够,将核心线程池大小设为了0,同时任务出现了访问redis超时的现象,这使得在请求数并不多的情况下就出现了大量任务执行延迟的现象。
解决:将核心线程池和最大线程池数目都修改成了1/2 CPU核数;并且修改redis连接池配置,一定程度降低连接redis的retryInterval和timeout时间。
问题:因为客户端错误使用,使得qps突增(几毫秒一次、甚至一毫米数次请求),并持续了近10分钟。10分钟内服务器CPU被打满,服务呈现一定程度不可用,存在大面积掉线现象。
分析:线程池在理论上是可以对请求进行削峰的,但仍然造成了CPU打满的现象。分析原因有: