转载

ThreadPoolExecutor 源码解读

ThreadPoolExecutor 源码解读

傻瓜源码-内容简介

傻瓜源码-内容简介

【职场经验】(持续更新)

如何日常学习、如何书写简历、引导面试官、系统准备面试、选择offer、提高绩效、晋升TeamLeader.....

【源码解读】(持续更新)<br/> 1. 源码选材:Java架构师必须掌握的所有框架和类库源码 <br/> 2. 内容大纲:按照“企业应用Demo”讲解执行源码:总纲“阅读指南”、第一章“源码基础”、第二章“相关Java基础”、第三章“白话讲源码”、第四章“代码解读”、第五章“设计模式”、第六章“附录-面试习题、相关JDK方法、中文注释可运行源码项目”

3. 读后问题:粉丝群答疑解惑

已收录: HashMap 、 ReentrantLock 、 ThreadPoolExecutor 、 《Spring源码解读》 、 《Dubbo源码解读》 .....

【面试题集】(持续更新)<br/>1. 面试题选材:Java面试常问的所有面试题和必会知识点<br/>2. 内容大纲:第一部分”注意事项“、第二部分“面试题解读”(包括:”面试题“、”答案“、”答案详解“、“实际开发解说”)

3. 深度/广度:面试题集中的答案和答案详解,都是对齐一般面试要求的深度和广度

4. 读后问题:粉丝群答疑解惑

已收录: Java基础面试题集 、 Java并发面试题集 、 JVM面试题集 、 数据库(Mysql)面试题集 、 缓存(Redis)面试题集 .....
【粉丝群】(持续更新) <br/> 收录:阿里、字节跳动、京东、小米、美团、哔哩哔哩等大厂内推

:stuck_out_tongue: 作者介绍:Spring系源码贡献者、世界五百强互联网公司、TeamLeader、Github开源产品作者

:stuck_out_tongue: 作者微信:wowangle03 (企业内推联系我)

加入我的粉丝社群,阅读更多内容。从学习到面试,从面试到工作,从 coder 到 TeamLeader,每天给你答疑解惑,还能有第二份收入!

ThreadPoolExecutor 源码解读

第 1 章 阅读指南

  • 本文基于 open-jdk 1.8 版本。
  • 本文根据” Demo “解读源码。
  • 本文建议分为两个学习阶段,掌握了第一阶段,再进行第二阶段;

    • 第一阶段,理解章节“源码解读”前的所有内容。即掌握 IT 技能:熟悉 ThreadPoolExecutor 原理。
    • 第二阶段,理解章节“源码解读”(包括源码解读)之后的内容。即掌握 IT 技能:精读 ThreadPoolExecutor 源码。
  • 建议按照本文内容顺序阅读(内容前后顺序存在依赖关系)。
  • 阅读过程中,如果遇到问题,记下来,后面不远的地方肯定有解答,或者在粉丝群里提问。
  • 阅读章节“源码解读”时,建议获得中文注释源码项目配合本文,Debug 进行阅读学习。
  • 源码项目中的注释含义:

    • “ Demo ”在源码中,会标注“ // ThreadPoolExecutor Demo ”。
    • 在源码中的不易定位到的主线源码,会标注 “ // tofix 主线 ”。
  • 以下注释的源码,暂时不深入讲解:

    • 在执行“ Demo ”过程中,没有被执行到的源码(由于遍历空集合、 if 判断),会标注“ / * Demo不涉及 / ”。

第 2 章 Demo

// ThreadPoolExecutor Demo
public class ThreadPoolExecutorTest {

    @Test
    public void testThreadPoolExecutor() {
        // 初始化线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                1,              // 指定 corePoolSize (核心线程数)
                1,              // 指定的 maximumPoolSize (线程池可以创建的最大线程数)
                0,             // 指定的 keepAliveTime(线程空闲的情况下,可以存活的时间)
                TimeUnit.SECONDS, // keepAliveTime 的时间单位
                new LinkedBlockingQueue<Runnable>()// 指定的 workQueue(装任务的队列)
        );
        // 初始化任务
        Task task = new Task();
        // 向线程池提交任务
        executor.execute(task);
        
        // 关闭线程池有以下两种方法:
        
        // 关闭线程池:
        // 1.线程池不会接受新的任务,执行拒绝任务处理器(默认是抛出异常)
        // 2.线程池执行完已经提交了的任务再结束
        executor.shutdown();
        
        // 关闭线程池:
        // 1.线程池不会接受新的任务,执行拒绝任务处理器(默认是抛出异常)
        // 2.正在执行任务的线程不会被终止,直到任务处理完毕;
        // 3.任务队列中没有线程处理的等待任务,会直接被终止;
        // 4.返回未执行的任务
        executor.shutdownNow();
    }

}

class Task implements Runnable {

    @Override
    public void run() {
        System.out.println("任务被执行!");
    }
}

第 3 章 相关 Java 基础

3.1 二进制数

在计算机中,所有的数字都是以二进制的形式存在,只用 0 和 1 两个数码来表示。为了区分正数和负数的二进制数,用最高位表示符号位(0 代表正数/1 代表负数)。

以 int 类型的二进制为例;int 类型的十进制整数转换成二进制,最多能够转换成 32 位的二进制数(前 16 位称为高 16 位,后 16 位称为低 16 位);正整数二进制数第 32 位为 0;负整数二进制数第 32 位为 1。

在后文的讲解中,遇到类似这种的 int 二进制数: 0000 0000 0000 0000 0000 0000 0010 1010 ,就直接省略为 0010 1010。

3.2 正整数十进制转二进制

以 42 为例,转化为二进制等于 0010 1010。

ThreadPoolExecutor 源码解读

3.3 二进制转正整数十进制

以 0001 0100 为例,转换为整数十进制等于 20。

ThreadPoolExecutor 源码解读

3.4 二进制加法

把二进制数从右对齐,根据“逢二进一”规则计算;二进制数加法的法则:

0+0=0
0+1=1+0=1
1+1=0 (进位为1) 
1+1+1=1 (进位为1)

例如:1110 和 1011 相加过程如下:

ThreadPoolExecutor 源码解读

3.5 二进制减法

把二进制数从右对齐,根据“借一有二”的规则计算;二进制数减法的法则:

0-0=0
1-1=0
1-0=1
0-1=1 (借位为1)

例如:1101 和 1011 相减过程如下:

ThreadPoolExecutor 源码解读

3.6 原码、反码、补码

以负数 -5 为例:(扩展:正数的二进制反码、补码都是原码本身)

  • 1000 0000 0000 0000 0000 0000 0000 0101( -5 的二进制数;这样的编码方式,称为二进制“原码”)
  • 1111 1111 1111 1111 1111 1111 1111 1010( -5 的二进制原码符号位不变,其余位取反;这样的编码方式,称为二进制“反码”)
  • 1111 1111 1111 1111 1111 1111 1111 1011(-5 的二进制反码 + 1;这样的编码方式,称为二进制“补码”)

3.7 n<<w(有符号左移)

以 5<<2 为例;就是把 5 的二进制位往左移两位,右边补 0 ,得出的十进制结果为 20。(符号位也会跟着左移运算移动)

  • 0000 0101(5 的二进制数)
  • 0001 0100(5 的二进制数向往左移动两位,右边补 0,得出的十进制结果为 20)

在计算机中,负数都是以二进制补码的形式表示。以 -5<<2 为例;就是把 -5 的二进制补码位往左移两位,右边补 0 ;得出的结果仍是二进制补码的形式(转换成十进制是 -20)。如下:

  • 1000 0000 0000 0000 0000 0000 0000 0101( -5 的二进制原码)
  • 1111 1111 1111 1111 1111 1111 1111 1010( -5 的二进制原码符号位不变,其余位取反,得到二进制反码)
  • 1111 1111 1111 1111 1111 1111 1111 1011( -5 的二进制反码 + 1 ,得到二进制补码)
  • ========= -5<<2 运算开始 ==========
  • 1111 1111 1111 1111 1111 1111 1110 1100( -5 的二进制补码向左移动两位,右边补 0,得到二进制补码形式的结果)
  • ========= -5<<2 运算结束 ==========
  • 1111 1111 1111 1111 1111 1111 1110 1011(二进制补码结果 - 1,得到二进制反码)
  • 1000 0000 0000 0000 0000 0000 0001 0100(二进制反码符号位不变,其余位再取反,得到二进制原码,转化为十进制是 -20)

3.8 a&b(按位与)

就是将 a 和 b 先转换为二进制数,然后相同位比较,只有两个都为1,结果才为 1,否则为 0。

以 3&5=1 为例:

  • 0000 0011(3 的二进制数)
  • 0000 0101(5 的二进制数)
  • 0000 0001(3&5(011 & 101)的结果,得出的十进制结果是 1)

3.9 a|b(按位或)

就是将 a 和 b 先转换为二进制数,然后相同位比较,只要一个为 1 ,结果即为 1,否则为 0 。

以 6|2=6 为例:

  • 0000 0110(6 的二进制数)
  • 0000 0010(2 的二进制数)
  • 0000 0110( 6|2(110|010)的结果,得出的十进制结果是 6)

3.10 跳出多重循环

代码示例 跳出多重循环

@Test
    public void testRetry() {
        retry:
        for (int i = 0; i < 3; i++) {
            for (int j = 0; j < 3; j++) {
                if (i == 1) {
                    continue retry;
                }
                System.out.println("i=" + i + ",j = " + j);
            }
        }

        System.out.println("------分割线------");

        retry:
        for (int i = 0; i < 3; i++) {
            for (int j = 0; j < 3; j++) {
                if (i == 1) {
                    break retry;
                }
                System.out.println("i=" + i + ",j = " + j);
            }
        }
    }

输出如下:

i=0,j = 0

i=0,j = 1

i=0,j = 2

i=2,j = 0

i=2,j = 1

i=2,j = 2

------分割线------

i=0,j = 0

i=0,j = 1

i=0,j = 2

3.11 Thread

void interrupt():在一个线程中调用另一个线程的 interrupt() 方法,会将那个线程设置成线程中断状态,而不会真的中断线程。

boolean isInterrupted(boolean ClearInterrupted):返回当前线程是否处于中断状态,ClearInterrupted 为 true,则返回的同时清除中断状态,反之则保留中断状态。

boolean interrupted():就是封装了 isInterrupted(boolean ClearInterrupted) 方法,参数为 true。

boolean interrupted():就是封装了 isInterrupted(boolean ClearInterrupted) 方法,参数为 false。

boolean isInterrupted():就是封装了 isInterrupted(boolean ClearInterrupted) 方法,参数为 false。

interrupt() 代码示例

class ThreadTest {
    public static void main(String[] args) throws Exception {

        MyThread thread = new MyThread();
        thread.start();
        
        // 主线程通过调用 thread 对象的 interrupt() 方法,将 thread 线程设置成线程中断状态,但不会真的中断线程。
        thread.interrupt();

        System.out.println("主线程执行结束!");
    }
}

class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("test thread!");
    }
}
// 打印结果:
// 主线程执行结束!
// test thread!

3.12 LinkedBlockingQueue

void offer(E e):向队尾添加元素,若队列已经满了,则直接返回 false。

E poll(long timeout, TimeUnit unit):移除并返回 BlockingQueue 头部的元素,如果队列为空,则阻塞timeout 个 unit 单位时间;如果规定时间内,仍然没有数据可取,则返回 null 。如果线程被标记为中断状态(对线程调用 interrupt() 方法),则会抛出中断异常。

E take():移除并返回 BlockingQueue 头部的元素,如果队列为空,则阻塞。如果线程被标记为中断状态(对线程调用 interrupt() 方法),则会抛出中断异常。

int drainTo(Collection<? super E> c):从 BlockingQueue 移除所有数据,并放到 c 集合里。

boolean remove(Object o):在 LinkedBlockingQueue 中移除 o。

3.13 原子性

满足以下几个特点,我们就说这个操作支持原子性,线程安全:

  1. 原子操作中的所有子操作,要不全成功、要不全失败;
  2. 线程执行原子操作过程中,不会受到其它线程的任何影响;
  3. 其它线程只能感知到原子操作开始前和结束后的变化。

解释:

包含多个操作单元,但仍支持原子性,通常都是由锁实现的。

代码示例:

class Test {
    int x = 0;
    int y = 0;

    public void test() {
        // 原子操作
        x = 10; 

        // 大致分为两步:1)获取 x 的值到缓存里;2)取出缓存里的值,赋值给 y
        // 不支持原子性;获取 x 的值到缓存里之后,其它线程可能修改 x 的值,导致 y 值错误
        y = x; 

        // 大致分为三步:1)获取 x 的值到缓存里;2)取出缓存里的值加一;3)赋值给 x
        // 不支持原子性;原理类似 y = x;
        x++; 

    }

}

3.14 Cas

Cas 是 Compare-and-swap(比较并替换)的缩写,是支持原子性的操作;在 Java 中,底层是 native 方法实现,通过 CPU 提供的 lock 信号保证的原子性。

想要将数据 V 的原值 O 替换为新值 N,执行 Cas 操作:

  1. 预先读取数据 V 的值 O 作为预期值;
  2. 执行 Cas 操作:

    1. 比较当前数据 V 的值是否是 O;

      1. 如果是,则替换为 N,返回执行成功;
      2. 如果不是,则不替换,返回执行失败;

例子:

以 java.util.concurrent.atomic 包中的 AtomicInteger 为例;

public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(100);
        // 将 AtomicInteger 的值,从 100 替换为 200
        Boolean b = atomicInteger.compareAndSet(100, 200);
        // 返回 true,替换成功
        System.out.println(b);
    }

3.15 volatile

用于修饰变量,可以保证被修饰变量的操作支持可见性和有序性,但不支持原子性。详见”Java 并发面试题集“

第 4 章 源码基础

4.1 导读

1. 工作线程定义

文中的工作线程是指线程池创建的存活线程。

4.2 ThreadPoolExecutor

代码示例 ThreadPoolExecutor 重要成员变量

public class ThreadPoolExecutor extends AbstractExecutorService {
    
   /**
    * 可通过构造函数指定的成员变量
    */

    // 创建工作线程的工厂类
    private volatile ThreadFactory threadFactory;

    // 核心线程数。用于控制线程池逻辑:当用户提交任务时,只要工作线程数小于 corePoolSize 时,就会创建工作线程执行任务;如果等于 corePoolSize,就不会创建工作线程,而是将任务放到 workQueue 队列对象里。
    // 也就是说核心线程只是一个概念,在代码中并没有标记某个线程是否是核心线程;也就是说当工作线程数小于等于 corePoolSize 时,线程池中这部分工作线程就是核心线程。
    private volatile int corePoolSize;

    // 线程存活时间。指线程空闲的情况下,可以存活的时间;核心线程不受 keepAliveTime 控制;除非 allowCoreThreadTimeOut 置为 true
    private volatile long keepAliveTime;
    
    // 是否允许核心线程空闲超时,默认为 false。如果配置 true,核心线程也会受 keepAliveTime 控制
    private volatile boolean allowCoreThreadTimeOut;
    
    // 当工作线程数等于 corePoolSize 时,提交的任务就会放在 workQueue 指定的队列对象里。
    private final BlockingQueue<Runnable> workQueue;
    
    // 表示线程池中最多能够容纳多少工作线程数。当工作线程数等于配置的 corePoolSize,并且 workQueue 已满时,线程池会创建新的非核心线程,直到线程池中的工作线程总数等于 maximumPoolSize;
    private volatile int maximumPoolSize;
    
    // 拒绝任务处理器,默认是抛出异常;如果【任务队列已满,并且工作线程总数等于 maximumPoolSize 】或者【已经调用 shutdown() 方法或 shutdownNow() 方法】,向线程池提交任务,则会执行 handler 
    private volatile RejectedExecutionHandler handler;

   /**
    * 供内部使用的成员变量
    */

    // ctl 二进制值的低29位,表示工作线程数
    // ctl 二进制值的高3位,表示线程池运行状态
    // ctl 是 AtomicInteger 类型,也就是说基于 CAS,支持原子操作,
   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    // CAPACITY 表示工作线程数容量(十进制值为 536870911;二进制值为 0001 1111 1111 1111 1111 1111 1111 1111),用于参与 ctl 的计算,获取运行状态以及工作线程数;也是比 maximumPoolSize 优先级更高的工作线程数上限
    private static final int COUNT_BITS = Integer.SIZE - 3; // 29
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;

    // workers 就是存放 worker(工作线程执行者,每个对象持有一个 thread 线程对象,代理了任务的执行) 的容器
    private final HashSet<Worker> workers = new HashSet<Worker>();

    // largestPoolSize 记录 workers 里保存 worker 对象最多时的数量
    private int largestPoolSize;
    
    // completedTasks 表示线程池累积处理的任务数量
    volatile long completedTasks;
    
   /**
    * 线程池状态
    */
    
    // 状态变化:
    // 运行-> 关闭(调用 shutdown方法)-> 整理-> 终止(RUNING->SHUTDOWN->TIDYING->TERMINATED)
    // 运行-> 停止(调用 shutdownNow方法)-> 整理-> 终止(RUNING->STOP->TIDYING->TERMINATED)
    
    // RUNNING 表示运行状态(初始状态);(十进制值为-536870912;二进制值为 1110 0000 0000 0000 0000 0000 0000 0000)
    private static final int RUNNING = -1 << COUNT_BITS;

    // SHUTDOWN 表示关闭状态(十进制值为 0;二进制值为 0000 0000 0000 0000 0000 0000 0000 0000);调用 shutdown() 方法时,会将线程池的状态改为 SHUTDOWN;
    private static final int SHUTDOWN = 0 << COUNT_BITS;

    // STOP 表示停止状态(十进制值为 536870912;二进制值为 0010 0000 0000 0000 0000 0000 0000 0000);调用 shutdownNow() 方法时,会将线程池的状态改为STOP;
    private static final int STOP = 1 << COUNT_BITS;

    // TIDYING 表示整理状态(十进制值为 1073741824;二进制值为 0100 0000 0000 0000 0000 0000 0000 0000),只有当线程池处于【【SHUTDOWN状态并且 workQueue 队列不存在任务】或者【STOP状态】】时,并且所有工作线程都被销毁,才会将线程池状态改为 TERMINATED ,在改为TERMINATED状态前,会先改为TIDYING,增加这个状态为了区分是否执行 terminated() 方法(该方法默认为空方法,模版模式,用于子类实现扩展),执行 terminated() 方法前为 TIDYING状态,执行后为 TERMINATED 状态
    private static final int TIDYING = 2 << COUNT_BITS;

    // TERMINATED 表示终止状态(十进制值为 1610612736;二进制值为 0110 0000 0000 0000 0000 0000 0000 0000);只有当线程池处于【【SHUTDOWN状态并且 workQueue 队列不存在任务】或者【STOP状态】】,并且所有工作线程都被销毁,并且执行完 terminated() 方法后,线程池才会设置为TERMINATED 状态
    private static final int TERMINATED = 3 << COUNT_BITS;

4.3 Worker

ThreadPoolExecutor 内部类 Worker 。顾名思义,就是目标任务的执行者,每个对象里持有一个 thread 线程对象;基于代理者模式,控制目标任务的访问与执行。

代码示例 Worker 重要成员变量

private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable {
        
        // 持有的工作线程对象(thread 是初始化 Worker 时,通过 ThreadPoolExecutor的 threadFactory 创建出来的,用于执行 firstTask)
        final Thread thread;

        // 指定的第一个目标任务;(是初始化 worker 时,指定的;如果为空,则说明可能创建时就未指定或者指定的第一个任务已经处理完了, Worker 就开始从 workQueue 取任务执行)
        Runnable firstTask;

        // 当前 Worker 对象处理的任务数量总和
        volatile long completedTasks;

<br/>

加入我的粉丝社群,阅读全部内容

从学习到面试,从面试到工作,从 coder 到 TeamLeader,每天给你答疑解惑,还能有第二份收入,这样的知识星球,难道你还要犹豫!

ThreadPoolExecutor 源码解读

原文  https://segmentfault.com/a/1190000023008886
正文到此结束
Loading...