概述
ThreadPoolExecutor 是 JDK 中线程池的实现类, 它的继承结构如下:
本文主要分析 ThreadPoolExecutor 类的主要方法和实现原理(部分代码暂未涉及,后面有机会再行分析),以后再分析 Executor 和 ExecutorService 接口的相关内容。
代码分析
成员变量
该类中的成员变量较多,下面分析一些主要的。
// 该变量是一个原子整型变量,保存了线程池的状态和线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 32-3=29
// 线程的最大容量(即池内允许的最大线程数)
// 00011111 11111111 11111111 11111111,即 29 个 1,超过 5 亿
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 2^29-1
// runState is stored in the high-order bits
// 线程池的运行状态,保存在 ctl 的高位
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
这里用了一个原子整型(AtomicInteger,可以理解为线程安全的 Integer 类,占用 4 个字节,32 位 )变量 ctl 来表示线程池的运行状态和线程池内部的线程数量。 其中高 3 位表示线程池的运行状态,低 29 位表示线程池中线程的数量。
线程池的 状态 有以下 5 种 :
1. RUNNING: 接受新的任务,并且处理任务队列中的任务;
2. SHUTDOWN: 不接受新的任务,但处理任务队列中的任务;
3. STOP: 不接受新的任务,不处理任务队列中的任务,并且中断正在进行的任务;
4. TIDYING: 所有的任务都已终结,工作线程的数量为 0;
5. TERMINATED: 执行 terminated() 方法后进入该状态,terminated() 方法默认实现为空。
这些状态之间的转换流程及触发条件如图所示:
接下来看其他成员变量:
// 任务队列(阻塞队列)
private final BlockingQueue<Runnable> workQueue;
// 互斥锁
private final ReentrantLock mainLock = new ReentrantLock();
// 工作线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
// 锁对应的条件
private final Condition termination = mainLock.newCondition();
// 线程池创建过的最大线程数量
private int largestPoolSize;
// 已完成任务的数量
private long completedTaskCount;
// 线程工厂类,用于创建线程
private volatile ThreadFactory threadFactory;
// 拒绝策略
private volatile RejectedExecutionHandler handler;
// 空闲线程的存活时间
private volatile long keepAliveTime;
/*
* 核心线程是否允许超时
* 默认为 false,表示核心线程即使处于空闲状态也继续存活;
* 若为 true,核心线程同样受到 keepAliveTime 的超时约束
*/
private volatile boolean allowCoreThreadTimeOut;
// 核心池大小
private volatile int corePoolSize;
// 最大池大小
private volatile int maximumPoolSize;
// 默认拒绝策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
这里有几个重要的成员变量:
corePoolSize: 核心池大小;
maximumPoolSize: 最大池大小,线程池中能同时存在的最大线程数,大于等于 corePoolSize ;
workQueue: 工作/任务队列,是一个 阻塞队列,可参考前文「 JDK源码分析-BlockingQueue 」的分析。
为便于理解,这里先大概描述下向线程池提交任务的流程,后面再分析其代码实现:
① 初始化一个 容量为 corePoolSize 的池子;
② 刚开始,每来一个任务就在池中创建一个线程去执行该任务,直到池中的容量到达 corePoolSize;
③ 此时若再来任务,则把这些任务放到 workQueue 中;
④ 若 workQueue 也满了,则继续创建线程执行任务,直到线程数量达到 maximumPoolSize;
⑤ 若 workQueue 已满,且线程数量达到 maximumPoolSize,此时若还有任务到来,则执行拒绝策略(handler)。
keepAliveTime & allowCoreThreadTimeOut
其中 keepAliveTime 表示空闲线程的存活时间,这两个值有一定关联:
若 allowCoreThreadTimeOut 为 false (默认),且线程数量超出 corePoolSize,则空闲时间超过 keepAliveTime 的线程会被关闭(最多保留 corePoolSize 个线程存活);
若将 allowCoreThreadTimeOut 设为 true,核心池的线程也会受该超时的影响而关闭。
构造器
ThreadPoolExecutor 内部有多个构造器,但最终都是调用下面这个:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null : AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
构造器参数虽然比较多,但基本都是简单的赋值,前面已经分析过这些成员变量的含义,这里不再赘述。下面分析它的核心方法 execute。
在此之前,先看几个常用方法:
// Packing and unpacking ctl
// 根据 ctl 和 CAPACITY 得到线程池的运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 根据 ctl 和 CAPACITY 得到线程池中的线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 将线程池运行状态和线程数量合并为 ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
execute 方法代码如下:
// command 是一个 Runnable 对象,也就是用户提交执行的任务
public void execute(Runnable command) {
// 提交的任务为空时抛出异常
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// 获取当前 ctl (存有线程池状态和线程数量)
int c = ctl.get();
// 若当前工作线程数量小于核心池大小(coolPoolSize)
// 则在核心池中新增一个工作线程,并将该任务交给这个线程执行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 重新获取(存在并发可能)
c = ctl.get();
}
// 若执行到这里,表示池中线程数量 >= corePoolSize,或者上面 addWorker 失败
// 若线程池处于 RUNNING 状态,并且该任务(command)成功添加到任务队列
if (isRunning(c) && workQueue.offer(command)) {
// 再次获取 ctl 值
int recheck = ctl.get();
// 若线程池不是运行状态,则要把上面添加的任务从队列中移除并执行拒绝策略
//(可理解为“回滚”操作)
if (! isRunning(recheck) && remove(command))
// 执行拒绝策略
reject(command);
// 若此时池中没有线程,则新建一个
// PS: 这里是防止任务提交后,池中没有存活的线程了
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 根据上述代码分析,若执行到这里,可分为以下两种情况:
// ① 线程池不是 RUNNING 状态;
// ② 线程池处于 RUNNING 状态,且实际线程数量 workCount >= corePoolSize,
// 并且,添加到 workQueue 失败(已满)
// 此时,则需要和 maximumPoolSize 进行比较,
// 若 workCount <= maximumPoolSize, 则新建一个线程去执行该任务;
// 否则,即 workCount > maximumPoolSize (饱和),则执行拒绝策略
else if (!addWorker(command, false))
// 执行拒绝策略
reject(command);
}
该方法描述的就是一个任务提交到线程池的流程, 主要执行逻辑如下:
1. 若正在运行的线程数少于 corePoolSize,则创建一个新的线程,并将传入的任务(command)作为它的第一个任务执行。
2. 若运行的线程数不小于 corePoolSize,则将新来的 任务添加到任务队列(workQueue)。若入队成功,仍需再次检查是否需要增加一个线程(上次检查之后现有的线程可能死了,或者进入该方法时线程池 SHUTDOWN 了,此时需要执行回滚);若池中没有线程则新建一个(确保 SHUTDOWN 状态也能执行队列中的任务)。
3. 若任务不能入队(队列已满),则创建新的线程并执行任务,若失败(超过 maximumPoolSize),则表示线程池关闭或者已经饱和,因此拒绝该任务。
为了便于理解,可参考下面的流程图:
下面分析 Worker 类及 addWorker 方法。
内部嵌套类 Worker
// 继承自 AQS,且实现了 Runnable 接口
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
// 运行的第一个任务,可能为空
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 初始化 thread
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// 其他一些 AQS 相关的方法不再一一列举
}
可以看到 Worker 类继承自 AQS,它的实现与 ReentrantLock 有一些类似,可对比前文「 JDK源码分析-ReentrantLock 」分析。而且,Worker 类实现了 Runnable 接口,它的 run 方法是将自身作为参数传递给了外部类的 runWorker 方法,下面分析这两个方法。
addWorker 方法:
// firstTask: 第一个任务,可为空
// core: 是否为核心池,true 是,false 为最大池
private boolean addWorker(Runnable firstTask, boolean core) {
// 该循环的主要作用就是增加 workCount 计数,增加成功后再新增 Worker 对象
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/*
* rs >= SHUTDOWN 表示线程池不再接受新的任务
* 该判断条件分为以下三种:
* ① 线程池处于 STOP, TYDING 或 TERMINATED 状态;
* ② 线程池处于 SHUTDOWN 状态,且 firstTask 不为空;
* ③ 线程池处于 SHUTDOWN 状态,且 workQueue 为空
* 满足任一条件即返回 false.
*/
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 超出最大容量 CAPACITY,或者超出初始设置的核心池/最大池数量,则返回 false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS 方式增加 ctl 的 workerCount 数量(该循环的主要目的)
if (compareAndIncrementWorkerCount(c))
break retry; // 若增加失败则退出循环
c = ctl.get(); // Re-read ctl
// 运行状态改变
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 标记 Worker 是否启动、是否添加成功
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 将 firstTask 封装成 Worker 对象
w = new Worker(firstTask);
// 获取 thread 对象 t
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 若线程池状态小于 SHUTDOWN,即为 RUNNING 状态;
// 或者为 SHUTDOWN 状态,且 firstTask 为空,
// 表示不再接受新的任务,但会继续执行队列中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加到工作线程集合(HashSet)
workers.add(w);
// 更新最大计数
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 标记 Worker 添加成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 若成功添加到工作线程集合,则启动线程执行任务
if (workerAdded) {
// 启动线程
t.start();
workerStarted = true;
}
}
} finally {
// Worker 启动失败,执行回滚操作
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
runWorker 方法:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// task 不为空时才执行,循环执行
// getTask 是从 workQueue 中获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 若线程池状态 >= STOP,则需要中断该线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 中断线程
wt.interrupt();
try {
// 任务执行前调用该方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 任务执行后调用该方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// getTask 返回空,说明任务队列没有任务了
processWorkerExit(w, completedAbruptly);
}
}
可以看到这里有 beforeExecute 和 afterExecute 方法,分别表示提交的任务执行前后做的事情,在 ThreadPoolExecutor 类中这两个都是空方法。我们可以通过继承 ThreadPoolExecutor 类并重写这两个方法来定制自己的需求。
getTask 方法:
// 从任务队列(阻塞队列)中取任务
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// 获取线程池运行状态
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/*
* 线程池运行状态 rs >= SHUTDOWN,表示非 RUNNING 状态
* 该判断条件有两个:
* 1. rs >= STOP;
* 2. rs == SHUTDOWN,且工作队列为空
* 若满足上述条件中的一个,则将线程数量(workerCount)减少 1,返回 null
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); // 减少工作现场数量
return null; // 返回 null 表示会从池中移除一个 Worker
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 是否要移除 Worker
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 线程数大于 maximumPoolSize,或者需要移除 Worker
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null; // 返回空意味着会减少移除一个 Worker
continue;
}
try {
// 从 workQueue 中获取任务(Runnable 对象)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
该方法主要是从任务队列 workQueue 中获取任务,并且控制池内线的程数量。
拒绝策略
拒绝策略 RejectedExecutionHandler 是一个接口,它只有一个 rejectedExecution 方法,代码如下:
public interface RejectedExecutionHandler {
// 执行拒绝策略
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
它在 ThreadPoolExecutor 中的 几个实现类如下:
ThreadPoolExecutor 默认的拒绝策 略为 AbortPolicy,代码如下:
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 抛出异常
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
可以看到,该策略就是直接抛出 RejectedExecutionException 异常。 其他拒绝策 略代码也都相对简单,不再一一列举。 值得一提的是,如果我们对这几种策略都不满意,可以自定义 拒绝策略(实现 RejectedExecutionHandler 接口)。
小结
本文主要分析了线程池 ThreadPoolExecutor 类的主要成员变量和核心方法实现,主要包括一个任务(Runnable)的提交流程。
该类稍微有些复杂,分析时首先要搞清楚任务提交的流程以及 主要成员变量(workQueue、corePoolSize、maximumP oolSize、keepAliveTime、allowCoreThreadTimeOut 等)的含义,接下来再分析会更清晰。
PS: 本文 是本人参考网上的一些文章及个人理解的结果,如有不正之处,敬请指正。此外,也建议大家多读几篇相关文章进行比较分析,以便更容易理解。
相关阅读:
JDK源码分析-BlockingQueue
JDK源码分析-AbstractQueuedSynchronizer(2)
参考链接:
https://javadoop.com/2017/09/05/java-thread-pool/
http://ideabuffer.cn/2017/04/04/%E6%B7%B1%E5%85%A5%E7%90%86%E8%A7%A3Java%E7%BA%BF%E7%A8%8B%E6%B1%A0%EF%BC%9AThreadPoolExecutor/