转载

JDK源码分析-ThreadPoolExecutor

概述

ThreadPoolExecutor 是 JDK 中线程池的实现类, 它的继承结构如下:

JDK源码分析-ThreadPoolExecutor

本文主要分析  ThreadPoolExecutor  类的主要方法和实现原理(部分代码暂未涉及,后面有机会再行分析),以后再分析  Executor  和  ExecutorService 接口的相关内容。

代码分析

成员变量

该类中的成员变量较多,下面分析一些主要的。


 

// 该变量是一个原子整型变量,保存了线程池的状态和线程数量

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int COUNT_BITS = Integer.SIZE - 3; // 32-3=29

// 线程的最大容量(即池内允许的最大线程数)

// 00011111 11111111 11111111 11111111,即 29 个 1,超过 5 亿

private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 2^29-1


// runState is stored in the high-order bits

// 线程池的运行状态,保存在 ctl 的高位

private static final int RUNNING = -1 << COUNT_BITS;

private static final int SHUTDOWN = 0 << COUNT_BITS;

private static final int STOP = 1 << COUNT_BITS;

private static final int TIDYING = 2 << COUNT_BITS;

private static final int TERMINATED = 3 << COUNT_BITS;

这里用了一个原子整型(AtomicInteger,可以理解为线程安全的 Integer 类,占用  4 个字节,32 位 )变量 ctl 来表示线程池的运行状态和线程池内部的线程数量。 其中高 3 位表示线程池的运行状态,低 29 位表示线程池中线程的数量。

线程池的 状态 有以下 5 种

1. RUNNING: 接受新的任务,并且处理任务队列中的任务;

2. SHUTDOWN: 不接受新的任务,但处理任务队列中的任务;

3. STOP: 不接受新的任务,不处理任务队列中的任务,并且中断正在进行的任务;

4. TIDYING: 所有的任务都已终结,工作线程的数量为 0;

5. TERMINATED: 执行 terminated() 方法后进入该状态,terminated() 方法默认实现为空。

这些状态之间的转换流程及触发条件如图所示:

JDK源码分析-ThreadPoolExecutor

接下来看其他成员变量:


 

// 任务队列(阻塞队列)

private final BlockingQueue<Runnable> workQueue;

// 互斥锁

private final ReentrantLock mainLock = new ReentrantLock();

// 工作线程集合

private final HashSet<Worker> workers = new HashSet<Worker>();

// 锁对应的条件

private final Condition termination = mainLock.newCondition();

// 线程池创建过的最大线程数量

private int largestPoolSize;

// 已完成任务的数量

private long completedTaskCount;

// 线程工厂类,用于创建线程

private volatile ThreadFactory threadFactory;

// 拒绝策略

private volatile RejectedExecutionHandler handler;

// 空闲线程的存活时间

private volatile long keepAliveTime;

/*

* 核心线程是否允许超时

* 默认为 false,表示核心线程即使处于空闲状态也继续存活;

* 若为 true,核心线程同样受到 keepAliveTime 的超时约束

*/

private volatile boolean allowCoreThreadTimeOut;

// 核心池大小

private volatile int corePoolSize;

// 最大池大小

private volatile int maximumPoolSize;

// 默认拒绝策略

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

这里有几个重要的成员变量:

corePoolSize: 核心池大小;

maximumPoolSize: 最大池大小,线程池中能同时存在的最大线程数,大于等于 corePoolSize

workQueue:  工作/任务队列,是一个 阻塞队列,可参考前文「 JDK源码分析-BlockingQueue 」的分析。

为便于理解,这里先大概描述下向线程池提交任务的流程,后面再分析其代码实现:

① 初始化一个 容量为 corePoolSize 的池子;

② 刚开始,每来一个任务就在池中创建一个线程去执行该任务,直到池中的容量到达 corePoolSize;

③ 此时若再来任务,则把这些任务放到 workQueue 中;

④ 若 workQueue 也满了,则继续创建线程执行任务,直到线程数量达到  maximumPoolSize;

⑤ 若 workQueue 已满,且线程数量达到  maximumPoolSize,此时若还有任务到来,则执行拒绝策略(handler)。

keepAliveTime & allowCoreThreadTimeOut

其中 keepAliveTime 表示空闲线程的存活时间,这两个值有一定关联:

若 allowCoreThreadTimeOut 为 false (默认),且线程数量超出 corePoolSize,则空闲时间超过  keepAliveTime 的线程会被关闭(最多保留 corePoolSize 个线程存活);

若将  allowCoreThreadTimeOut 设为 true,核心池的线程也会受该超时的影响而关闭。

构造器

ThreadPoolExecutor 内部有多个构造器,但最终都是调用下面这个:


 

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue<Runnable> workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler) {

if (corePoolSize < 0 ||

maximumPoolSize <= 0 ||

maximumPoolSize < corePoolSize ||

keepAliveTime < 0)

throw new IllegalArgumentException();

if (workQueue == null || threadFactory == null || handler == null)

throw new NullPointerException();

this.acc = System.getSecurityManager() == null ?

null : AccessController.getContext();

this.corePoolSize = corePoolSize;

this.maximumPoolSize = maximumPoolSize;

this.workQueue = workQueue;

this.keepAliveTime = unit.toNanos(keepAliveTime);

this.threadFactory = threadFactory;

this.handler = handler;

}

构造器参数虽然比较多,但基本都是简单的赋值,前面已经分析过这些成员变量的含义,这里不再赘述。下面分析它的核心方法 execute。

在此之前,先看几个常用方法:


 

// Packing and unpacking ctl

// 根据 ctl 和 CAPACITY 得到线程池的运行状态

private static int runStateOf(int c) { return c & ~CAPACITY; }

// 根据 ctl 和 CAPACITY 得到线程池中的线程数量

private static int workerCountOf(int c) { return c & CAPACITY; }

// 将线程池运行状态和线程数量合并为 ctl

private static int ctlOf(int rs, int wc) { return rs | wc; }

execute 方法代码如下:


 

// command 是一个 Runnable 对象,也就是用户提交执行的任务

public void execute(Runnable command) {

// 提交的任务为空时抛出异常

if (command == null)

throw new NullPointerException();

/*

* Proceed in 3 steps:

*

* 1. If fewer than corePoolSize threads are running, try to

* start a new thread with the given command as its first

* task. The call to addWorker atomically checks runState and

* workerCount, and so prevents false alarms that would add

* threads when it shouldn't, by returning false.

*

* 2. If a task can be successfully queued, then we still need

* to double-check whether we should have added a thread

* (because existing ones died since last checking) or that

* the pool shut down since entry into this method. So we

* recheck state and if necessary roll back the enqueuing if

* stopped, or start a new thread if there are none.

*

* 3. If we cannot queue task, then we try to add a new

* thread. If it fails, we know we are shut down or saturated

* and so reject the task.

*/

// 获取当前 ctl (存有线程池状态和线程数量)

int c = ctl.get();

// 若当前工作线程数量小于核心池大小(coolPoolSize)

// 则在核心池中新增一个工作线程,并将该任务交给这个线程执行

if (workerCountOf(c) < corePoolSize) {

if (addWorker(command, true))

return;

// 重新获取(存在并发可能)

c = ctl.get();

}

// 若执行到这里,表示池中线程数量 >= corePoolSize,或者上面 addWorker 失败

// 若线程池处于 RUNNING 状态,并且该任务(command)成功添加到任务队列

if (isRunning(c) && workQueue.offer(command)) {

// 再次获取 ctl 值

int recheck = ctl.get();

// 若线程池不是运行状态,则要把上面添加的任务从队列中移除并执行拒绝策略

//(可理解为“回滚”操作)

if (! isRunning(recheck) && remove(command))

// 执行拒绝策略

reject(command);

// 若此时池中没有线程,则新建一个

// PS: 这里是防止任务提交后,池中没有存活的线程了

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

// 根据上述代码分析,若执行到这里,可分为以下两种情况:

// ① 线程池不是 RUNNING 状态;

// ② 线程池处于 RUNNING 状态,且实际线程数量 workCount >= corePoolSize,

// 并且,添加到 workQueue 失败(已满)

// 此时,则需要和 maximumPoolSize 进行比较,

// 若 workCount <= maximumPoolSize, 则新建一个线程去执行该任务;

// 否则,即 workCount > maximumPoolSize (饱和),则执行拒绝策略

else if (!addWorker(command, false))

// 执行拒绝策略

reject(command);

}

该方法描述的就是一个任务提交到线程池的流程, 主要执行逻辑如下:

1. 若正在运行的线程数少于 corePoolSize,则创建一个新的线程,并将传入的任务(command)作为它的第一个任务执行。

2. 若运行的线程数不小于  corePoolSize,则将新来的 任务添加到任务队列(workQueue)。若入队成功,仍需再次检查是否需要增加一个线程(上次检查之后现有的线程可能死了,或者进入该方法时线程池 SHUTDOWN 了,此时需要执行回滚);若池中没有线程则新建一个(确保 SHUTDOWN 状态也能执行队列中的任务)。

3. 若任务不能入队(队列已满),则创建新的线程并执行任务,若失败(超过 maximumPoolSize),则表示线程池关闭或者已经饱和,因此拒绝该任务。

为了便于理解,可参考下面的流程图:

JDK源码分析-ThreadPoolExecutor

下面分析 Worker 类及 addWorker 方法。

内部嵌套类 Worker


 

// 继承自 AQS,且实现了 Runnable 接口

private final class Worker

extends AbstractQueuedSynchronizer

implements Runnable

{

/** Thread this worker is running in. Null if factory fails. */

final Thread thread;

/** Initial task to run. Possibly null. */

// 运行的第一个任务,可能为空

Runnable firstTask;

/** Per-thread task counter */

volatile long completedTasks;

/**

* Creates with given first task and thread from ThreadFactory.

* @param firstTask the first task (null if none)

*/

Worker(Runnable firstTask) {

setState(-1); // inhibit interrupts until runWorker

this.firstTask = firstTask;

// 初始化 thread

this.thread = getThreadFactory().newThread(this);

}

/** Delegates main run loop to outer runWorker */

public void run() {

runWorker(this);

}

// 其他一些 AQS 相关的方法不再一一列举

}

可以看到 Worker 类继承自 AQS,它的实现与 ReentrantLock 有一些类似,可对比前文「 JDK源码分析-ReentrantLock 」分析。而且,Worker 类实现了 Runnable 接口,它的 run 方法是将自身作为参数传递给了外部类的 runWorker 方法,下面分析这两个方法。

addWorker 方法:


 

// firstTask: 第一个任务,可为空

// core: 是否为核心池,true 是,false 为最大池

private boolean addWorker(Runnable firstTask, boolean core) {

// 该循环的主要作用就是增加 workCount 计数,增加成功后再新增 Worker 对象

retry:

for (;;) {

int c = ctl.get();

int rs = runStateOf(c);

/*

* rs >= SHUTDOWN 表示线程池不再接受新的任务

* 该判断条件分为以下三种:

* ① 线程池处于 STOP, TYDING 或 TERMINATED 状态;

* ② 线程池处于 SHUTDOWN 状态,且 firstTask 不为空;

* ③ 线程池处于 SHUTDOWN 状态,且 workQueue 为空

* 满足任一条件即返回 false.

*/

// Check if queue empty only if necessary.

if (rs >= SHUTDOWN &&

! (rs == SHUTDOWN &&

firstTask == null &&

! workQueue.isEmpty()))

return false;

for (;;) {

int wc = workerCountOf(c);

// 超出最大容量 CAPACITY,或者超出初始设置的核心池/最大池数量,则返回 false

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize))

return false;

// CAS 方式增加 ctl 的 workerCount 数量(该循环的主要目的)

if (compareAndIncrementWorkerCount(c))

break retry; // 若增加失败则退出循环

c = ctl.get(); // Re-read ctl

// 运行状态改变

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

}

// 标记 Worker 是否启动、是否添加成功

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

// 将 firstTask 封装成 Worker 对象

w = new Worker(firstTask);

// 获取 thread 对象 t

final Thread t = w.thread;

if (t != null) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

// Recheck while holding lock.

// Back out on ThreadFactory failure or if

// shut down before lock acquired.

int rs = runStateOf(ctl.get());

// 若线程池状态小于 SHUTDOWN,即为 RUNNING 状态;

// 或者为 SHUTDOWN 状态,且 firstTask 为空,

// 表示不再接受新的任务,但会继续执行队列中的任务

if (rs < SHUTDOWN ||

(rs == SHUTDOWN && firstTask == null)) {

if (t.isAlive()) // precheck that t is startable

throw new IllegalThreadStateException();

// 添加到工作线程集合(HashSet)

workers.add(w);

// 更新最大计数

int s = workers.size();

if (s > largestPoolSize)

largestPoolSize = s;

// 标记 Worker 添加成功

workerAdded = true;

}

} finally {

mainLock.unlock();

}

// 若成功添加到工作线程集合,则启动线程执行任务

if (workerAdded) {

// 启动线程

t.start();

workerStarted = true;

}

}

} finally {

// Worker 启动失败,执行回滚操作

if (! workerStarted)

addWorkerFailed(w);

}

return workerStarted;

}

runWorker 方法:


 

final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

w.unlock(); // allow interrupts

boolean completedAbruptly = true;

try {

// task 不为空时才执行,循环执行

// getTask 是从 workQueue 中获取任务

while (task != null || (task = getTask()) != null) {

w.lock();

// If pool is stopping, ensure thread is interrupted;

// if not, ensure thread is not interrupted. This

// requires a recheck in second case to deal with

// shutdownNow race while clearing interrupt

// 若线程池状态 >= STOP,则需要中断该线程

if ((runStateAtLeast(ctl.get(), STOP) ||

(Thread.interrupted() &&

runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())

// 中断线程

wt.interrupt();

try {

// 任务执行前调用该方法

beforeExecute(wt, task);

Throwable thrown = null;

try {

// 执行任务

task.run();

} catch (RuntimeException x) {

thrown = x; throw x;

} catch (Error x) {

thrown = x; throw x;

} catch (Throwable x) {

thrown = x; throw new Error(x);

} finally {

// 任务执行后调用该方法

afterExecute(task, thrown);

}

} finally {

task = null;

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {

// getTask 返回空,说明任务队列没有任务了

processWorkerExit(w, completedAbruptly);

}

}

可以看到这里有 beforeExecute 和 afterExecute 方法,分别表示提交的任务执行前后做的事情,在 ThreadPoolExecutor 类中这两个都是空方法。我们可以通过继承 ThreadPoolExecutor 类并重写这两个方法来定制自己的需求。

getTask 方法:


 

// 从任务队列(阻塞队列)中取任务

private Runnable getTask() {

boolean timedOut = false; // Did the last poll() time out?


for (;;) {

// 获取线程池运行状态

int c = ctl.get();

int rs = runStateOf(c);


// Check if queue empty only if necessary.

/*

* 线程池运行状态 rs >= SHUTDOWN,表示非 RUNNING 状态

* 该判断条件有两个:

* 1. rs >= STOP;

* 2. rs == SHUTDOWN,且工作队列为空

* 若满足上述条件中的一个,则将线程数量(workerCount)减少 1,返回 null

*/

if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

decrementWorkerCount(); // 减少工作现场数量

return null; // 返回 null 表示会从池中移除一个 Worker

}

int wc = workerCountOf(c);

// Are workers subject to culling?

// 是否要移除 Worker

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 线程数大于 maximumPoolSize,或者需要移除 Worker

if ((wc > maximumPoolSize || (timed && timedOut))

&& (wc > 1 || workQueue.isEmpty())) {

if (compareAndDecrementWorkerCount(c))

return null; // 返回空意味着会减少移除一个 Worker

continue;

}

try {

// 从 workQueue 中获取任务(Runnable 对象)

Runnable r = timed ?

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

workQueue.take();

if (r != null)

return r;

timedOut = true;

} catch (InterruptedException retry) {

timedOut = false;

}

}

}

该方法主要是从任务队列 workQueue 中获取任务,并且控制池内线的程数量。

拒绝策略

拒绝策略 RejectedExecutionHandler 是一个接口,它只有一个 rejectedExecution 方法,代码如下:


 

public interface RejectedExecutionHandler {

// 执行拒绝策略

void rejectedExecution(Runnable r, ThreadPoolExecutor executor);

}

它在 ThreadPoolExecutor 中的 几个实现类如下:

JDK源码分析-ThreadPoolExecutor

ThreadPoolExecutor  默认的拒绝策 略为 AbortPolicy,代码如下:


 

public static class AbortPolicy implements RejectedExecutionHandler {


public AbortPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

// 抛出异常

throw new RejectedExecutionException("Task " + r.toString() +

" rejected from " +

e.toString());

}

}

可以看到,该策略就是直接抛出 RejectedExecutionException 异常。 其他拒绝策 略代码也都相对简单,不再一一列举。 值得一提的是,如果我们对这几种策略都不满意,可以自定义 拒绝策略(实现 RejectedExecutionHandler 接口)。

小结

本文主要分析了线程池 ThreadPoolExecutor 类的主要成员变量和核心方法实现,主要包括一个任务(Runnable)的提交流程。

该类稍微有些复杂,分析时首先要搞清楚任务提交的流程以及 主要成员变量(workQueue、corePoolSize、maximumP oolSize、keepAliveTime、allowCoreThreadTimeOut 等)的含义,接下来再分析会更清晰。

PS: 本文 是本人参考网上的一些文章及个人理解的结果,如有不正之处,敬请指正。此外,也建议大家多读几篇相关文章进行比较分析,以便更容易理解。

相关阅读:

JDK源码分析-BlockingQueue

JDK源码分析-AbstractQueuedSynchronizer(2)

参考链接:

https://javadoop.com/2017/09/05/java-thread-pool/

http://ideabuffer.cn/2017/04/04/%E6%B7%B1%E5%85%A5%E7%90%86%E8%A7%A3Java%E7%BA%BF%E7%A8%8B%E6%B1%A0%EF%BC%9AThreadPoolExecutor/

JDK源码分析-ThreadPoolExecutor

原文  http://mp.weixin.qq.com/s?__biz=MzU4NzYyMDE4MQ==&mid=2247484006&idx=1&sn=5246d85b8c701d00532299ec2906dcc1
正文到此结束
Loading...