转载

Java并发编程学习笔记----线程池

记录一下有关Java线程池的学习笔记,主要内容来自于《Java并发编程的艺术》,增加了一些自己的理解和实际问题中的处理。

1. 线程池概念和优点

1.1 为什么需要线程池

线程同样是一个对象,对象的创建和销毁都需要消耗系统资源(类加载、垃圾回收)。频繁地创建线程会 消耗系统资源,降低系统稳定性 。 使用线程池可以对线程进行统一分配、调优和监控。

1.2 优点

  • 降低资源消耗 。通过重复利用已经创建的线程,来节约线程创建和销毁的资源消耗。
  • 提高相应速度 。任务提交时,可以使用线程池中线程执行任务,减少了等待线程创建的时间。
  • 使任务趋于平缓 。当有大量任务到来时,通过线程池的调度策略,先将任务放入队列中,控制同时执行的线程数,避免对系统造成巨大压力。
  • 监控和调优等。

2. 线程池调度流程

线程池的调度流程主要涉及三个概念: 核心线程池任务队列最大线程池 ,这三个概念分别对应线程池构造参数中的 corePoolSizeworkQueuemaximumPoolSize . 结合下图,说明提交一个新任务到线程池时,线程池的处理流程:

(1)第一阶段(预热阶段):核心线程池

判断线程池中线程数目是否达到 核心线程大小 ,未达到则 创建新线程 执行任务;达到,则转(2);

说明:

  • 经过测试,只看线程数目,不管线程是否活跃(不活跃线程达到最大存活时间才会被销毁)。
  • 经过测试,如果核心线程池大小为0,等同于核心线程池大小为1的情况,即任务提交时会新创建一个线程,然后再放入队列。

(2)第二阶段:工作队列

判断 队列是否已满 ,未满则将任务 放入队列 ;满,则转(3);

  • 这也就要求队列书尽量避免使用无界队列,有造成内存泄露的风险。

(3)第三阶段:最大线程池

判断线程池中线程数目是否达到 最大线程池数目 ,未达到 创建新线程 来处理任务;达到,则采取对应的 任务拒绝策略

  • 拒绝的意思是拒绝将任务加入到线程池,并不是说任务一定被拒绝。
Java并发编程学习笔记----线程池

3. 线程池使用

3.1 线程池创建

Jdk中定义了任务执行接口ExecutorService,并提供了实现类ThreadPoolExecutor。创建示例:

BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1);
ExecutorService executorService = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS, 
        queue, new ThreadFactoryBuilder().build(), new ThreadPoolExecutor.CallerRunsPolicy());
复制代码

3.2 任务提交

execute 用于提交无返回值的任务, submit 用于提交有返回值的任务(与Callable、Future结合)。 示例:

ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.execute(() -> {
    System.out.println("HelloWorld1");
});

Future<String> future = executorService.submit(() -> {
    return "HelloWorld2";
});
try {
    System.out.println(future.get());
} catch (InterruptedException | ExecutionException ex) {
    ex.printStackTrace();
}
复制代码

3.3 线程池关闭

可以使用 shutdownshutdownNow 来关闭线程池。

实际测试:

  • shutdown不会终止活跃线程;
  • 而shutdownNow会给活跃线程发送Interrupt信号,需要线程捕获InterruptedException。

3.4 线程池监控

ThreadPoolExecutor还提供了多种API对线程池的状态进行监控,需要注意的是这些API都需要加锁。常见API有:

  • getActiveCount:获取活跃线程数
  • getPoolSize:获取线程池中线程数目
  • getLargestPoolSize: 线程池中线程数最高纪录
  • getTaskCount:获取线程池中提交过任务总数
  • getCompletedTaskCount:已完成任务数

4. 线程池核心参数详解

通过 ThreadPoolExecutor 类可以创建一个线程池,作为 ExecutorService 接口的实现类。

4.1 参数列表

ThreadPoolExecutor类提供了多重构造器方法,其中参数最齐全的构造器如下,参数包括 corePoolSize (核心线程池大小), maximumPoolSize (线程池最大线程数), keepAliveTime (空闲线程存活时间), unit (空闲线程池存活时间单位), workQueue (线程池任务队列), threadFactory (线程创建工厂), handler (任务拒绝策略)

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
复制代码

4.2 corePoolSize(核心线程池数目)

线程池的第一阶段可以理解为预热阶段,就是要 先将核心线程池装满 (不论线程池中是否有空闲线程,且核心线程数目为0时等价于为1)。

如果执行prestartAllCoreThreads()方法,会直接创建并启动所有核心线程。

((ThreadPoolExecutor) executorService).prestartAllCoreThreads();
复制代码

理解

核心线程数目不宜太小,比如核心线程池为1,那么假若线程执行时里有休眠或者写redis超时等,这会使得后续所有任务都处于等待状态(工作队列较大的情况下),可以考虑将核心线程池数目设为 CPU核数 ,这样保证预热阶段就可以每个核运行一个线程。(任务映射到线程,线程映射到CPU)

4.3 workQueue(工作队列)

工作队列是用于存放等待执行任务的阻塞队列,可以根据实际场景来决策选择什么样的队列。

阻塞队列 是指,当队列满时往队列中放元素的线程会阻塞;当队列空时从队列中取元素的线程会阻塞。主要有如下集中可选的队列:

5.2.2

理解

工作队列应尽量使用 有界队列 ,无界队列一是容易导致内存泄露,二是最大线程池参数无效。

4.4 maximumPoolSize(最大线程数)

线程池最大线程数,只有当工作队列有界且容量有限的情况下,该参数才有意义。如果队列很大,会导致任务一直被放入队列中,而不会创建新的线程去执行。

理解

首先最大线程池数目只有在工作队列有界时才有效,另外最大线程池数目设置的意义在于充分利用CPU资源、避免任务等待时间过长。

4.5 threadFactory(线程工厂)

用于指定创建线程的工厂,比如可以使用可以google guaua包中提供的ThreadFactoryBuilder来快速的给线程池中线程设置自定义的线程名,一般使用默认的即可。

new ThreadFactoryBuilder().build()
复制代码

4.6 RejectedExecutionHandler(拒绝策略)

当线程池经过了第三阶段,即线程数目已经达到最大线程数目,那样的话任务将被拒绝添加到线程池中,根据拒绝策略执行任务。

常见拒绝策略有:

CallerRunsPolicy

理解

个人觉得如果不是特别占CPU资源的任务,使用CallerRunsPolicy策略比较合适,能保证任务被执行。

4.7 keepAliveTime和timeUnit(存活时间)

当线程池中线程处于空闲状态超过一定时间时会销毁该线程,一般常见设置是60s存活时间。

5. 线程工厂

工厂类Executors提供了多个用于创建典型线程池的API,接下来详细介绍几种典型线程池。

5.1 FixedThreadPool(固定数目线程池)

5.1.1 源码

Executors提供了两种FixedThreadPool创建API,分别是

public static ExecutorService newFixedThreadPool(int nThreads);
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) ;
复制代码

第一种API源码如下

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
复制代码

5.1.2 特点

观察源码可以分析,FixedThreadPool有如下特点:

  • (1)核心线程池数目和最大线程数目相等,也就是说 永远不会进入线程池调度的第三阶段
  • (2)线程存活时间为0, 空闲线程会被立即销毁
  • (3)无界阻塞队列,即任务永远不会被拒绝, 队列存在OOM异常的可能
  • (4)当线程数目大于等于CPU核数,且任务到来速率大于任务处理速率时(CPU密集型任务),可能会导致CPU被打满。

5.1.3 适用场景

FixedThreadPool最大的特点是限制了线程池最大线程数目,比较适用于任务 对系统资源消耗较大、负载比较重 的服务器。

5.2 CachedThreadPool

5.2.1 源码

Executors工厂类提供了两种CachedThreadPool的创建API:

public static ExecutorService newCachedThreadPool();
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory);
复制代码

第一种API的源码如下:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
复制代码

5.2.2 SynchronousQueue(同步队列)

SynchronousQueue被称为 无缓冲阻塞队列 ,用于在两个线程间移交元素。

经过测试SynchronousQueue有如下特点:

  • (1)put和take必须在不同的线程中,同一个线程无法先后offer、take。可以理解为被offer元素必须被take后put方法才能退出,所以无法在同一个线程中先后offer、take。
  • (2)在任何时候判断队列都是满的,remainingCapacity()方法总是返回0。比如直接使用add方法向队列添加元素,会抛出java.lang.IllegalStateException: Queue full。 猜测正是因为这点使得使用SynchronousQueue作工作队列的线程池没有第二阶段,直接进入第三阶段

5.2.3 特点

观察源码可以分析得到,CachedThreadPool具有以下特点:

  • (1)工作队列使用SynchronousQueue,判断工作队列总是满的,也就说的当核心线程池满后会不停地创建线程直至达到最大线程数目,而 跳过第二阶段
  • (2)核心线程池数为0(等价于1),最大线程数目是最大整数,也就说任务提交时会优先使用非活跃线程(因为核心线程池已满,且非活跃线程会有60s存活时间,不会直接新建线程来执行任务),没有活跃线程则会新建线程。当 任务到来速率大于任务处理速率 时,有 创建过多线程打满CPU的可能

5.2.4 适用场景

根据CachedThreadPool特点可以分析出,其适用于处理大量 短期任务 、或者负载较轻的服务器。(主要是不需要限制线程数目的场景)

5.3 SingleThreadExecutor(单线程池)

5.3.1 源码

(1)SingleThreadExecutor源码如下

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
复制代码

可以看出在ThreadPoolExecutor又包装了一层FinalizableDelegatedExecutorService。

(2)FinalizableDelegatedExecutorService源码如下:

static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
    FinalizableDelegatedExecutorService(ExecutorService executor) {
        super(executor);
    }
    protected void finalize() {
        super.shutdown();
    }
}
复制代码

FinalizableDelegatedExecutorService又如下特点:

  • 继承了DelegatedExecutorService(代理线程池)
  • 重写了finalize方法,在对象被回收时主动关闭线程池

(3)DelegatedExecutorService源码如下:

static class DelegatedExecutorService extends AbstractExecutorService {
    private final ExecutorService e;
    DelegatedExecutorService(ExecutorService executor) { e = executor; }
    public void execute(Runnable command) { e.execute(command); }
    public void shutdown() { e.shutdown(); }
    public List<Runnable> shutdownNow() { return e.shutdownNow(); }
    public boolean isShutdown() { return e.isShutdown(); }
    public boolean isTerminated() { return e.isTerminated(); }
    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        return e.awaitTermination(timeout, unit);
    }
    public Future<?> submit(Runnable task) {
        return e.submit(task);
    }
    public <T> Future<T> submit(Callable<T> task) {
        return e.submit(task);
    }
    public <T> Future<T> submit(Runnable task, T result) {
        return e.submit(task, result);
    }
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        return e.invokeAll(tasks);
    }
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        return e.invokeAll(tasks, timeout, unit);
    }
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
        return e.invokeAny(tasks);
    }
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return e.invokeAny(tasks, timeout, unit);
    }
}
复制代码

可以看出DelegatedExecutorService:

  • 构造器需要传入一个线程池对象,并且内部方法都映射成对传入线程池对象方法的执行,相当于一个 包装类、代理
  • 只可以执行包装类提供的方法,也就是说被DelegatedExecutorService包装的线程池不论其具体类型如何,都不能执行其特殊方法, 只能执行包装类提供的ExecutorService方法

5.3.2 SingleThreadExecutor特点

结合FinalizableDelegatedExecutorService和内部ThreadPoolExecutor可以得到SingleThreadExecutor具备以下特点:

  • 线程池的核心线程池数目和最大线程数目固定,且都为1。也就说 任意时刻只会一个任务被执行
  • 空闲线程存活时间为0,也就是说空闲线程会被即时销毁;
  • 线程池被GC回收时会主动关闭线程池中线程

5.3.3 适用场景

SingleThreadExecutor适用于需要顺序执行任务的场景,但对应地并发量会降低、QPS也会降低。

6. 合理使用线程池

6.1 核心线程池大小和最大线程数目配置

配置线程池大小主要需要考虑 任务类型任务执行时间机器负载

(1)任务类型

根据任务消耗资源可以将任务分为 CPU密集型任务IO密集型任务混合型任务

常见的代码处理(计算、逻辑判断等)任务都属于CPU密集型任务,而文件读写(打印日志)、网络占用等属于IO密集型任务。

  • CPU密集型任务 。对于CPU密集型任务,建议 降低最大线程数、使之小于CPU核数 ,线程过多一是频繁在线程间切换上下文会带来额外开销,而是容易使CPU满负荷运行。
  • IO密集型任务 。对于IO密集型任务,建议 提高线程数(比如2*CPU核数) ,因为活跃线程往往因为正在读写IO而没有持有CPU,此时可充分利用CPU处理其它线程。
  • 混合型任务 。根据任务占用CPU和IO时间,看能否将任务分解,当任务占用CPU和IO时间相差不大时,分解并发度提升较大。

(2)任务执行时间

一般情况下,一类任务使用一个线程池,这样可以使任务间在获取CPU时更加公平,避免较短任务等待时间过长。

需要注意的是,影响 任务执行时间 除了任务本身外,往往还需特别关注 休眠操作任务依赖性

  • 长任务 。任务执行时间较长时,建议 一定程度增大最大线程数目 ,来 增加并发能力(线程数小于CPU核数情况下),同时可以减少后面任务的等待时间(但是会由于CPU在线程间切换而增加一定额外开销)
  • 短任务 。任务执行时间较短时,建议 降低线程数 。因为执行较快,少量线程就可以胜任任务,并且当任务突增时,由于线程数较少可以 避免CPU被打满
  • 任务依赖性 。任务中如果有 访问redis、数据库等操作 ,需要额外注意, 对其他模块或组件的依赖将可能导致任务执行时间突增 。(1)首先 避免核心线程池设为0或1 ,假如核心线程只有1个,一个任务超时,将导致后续所有任务都延迟执行;(2)根据实际情况,可 适度增大最大线程池
  • 休眠操作 。任务内如果在某些情况下需要进行休眠的话(非常不建议使用休眠,如必要,建议间歇性休眠并设置最大休眠时间),也会导致任务执行时间增加。与上一点相同,注意核心线程池大小。

(3)机器负载

在负载较重的机器,通过限制线程池数目来降低机器压力。影响 机器负载 的因素一般是 机器配置 ,如果机器有多个服务混布,也会导致机器压力较大。

机器配置
服务混布
线程池的种类和数目

6.2 其它配置

  • 尽量使用 自定义的线程池 ,这样可以自定义策略。
  • 工作队列建议使用 有界队列 (避免OOM、使最大线程数目生效),队列数可以尽可能大些,防止队列一下子被填满;
  • 拒绝策略建议使用 CallerRunsPolicy 保证任务被执行;

6.3 实际问题举例

(1)线程池数目过小 + 访问redis超时,导致所有任务延迟处理

问题:当时由于对线程池参数理解不够,将核心线程池大小设为了0,同时任务出现了访问redis超时的现象,这使得在请求数并不多的情况下就出现了大量任务执行延迟的现象。

解决:将核心线程池和最大线程池数目都修改成了1/2 CPU核数;并且修改redis连接池配置,一定程度降低连接redis的retryInterval和timeout时间。

(2)短任务 + 请求突增,导致CPU被打满

问题:因为客户端错误使用,使得qps突增(几毫秒一次、甚至一毫米数次请求),并持续了近10分钟。10分钟内服务器CPU被打满,服务呈现一定程度不可用,存在大面积掉线现象。

分析:线程池在理论上是可以对请求进行削峰的,但仍然造成了CPU打满的现象。分析原因有:

  • i. 突增的请求是一种 短任务 ,认为是一种CPU密集型任务;
  • ii. 处理突增请求的线程池最大线程数达到CPU核数(N),这使得线程池在10分钟内一直有N个活跃线程。
  • iii. 机器同时有 混布服务 、服务中还有 多种不同类型线程池 。 综合以上这些因素,使得客户端请求突增时,每一个CPU在绝大部分时间处理突增的异常请求,导致CPU被打满,合理请求无法正常处理或者延时处理。

解决办法:

  • 服务器对处理客户端请求进行 限流 ,丢弃一部分次要消息、限制合理消息的速率。
  • 降低每种线程池的最大线程数 ,出于有混布服务,将当前服务上所有线程池最大线程数目控制到2/3 CPU核数。
原文  https://juejin.im/post/5e784466f265da574b793818
正文到此结束
Loading...