它用于执行指定的任务,把任务提交与任务执行分离,程序员不需要关注线程的管理,以及任务的执行。 ExecutorService 接口对 Executor 接口提供更多的扩展,ThreadPoolExecutor 类提供 可以扩展的线程池实现,而 Executors 只是对这些 Executor 提供方便的工厂方法。
类结构:
class FutureTask { // 任务运行状态 /* * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; // 底层运行的 callable private Callable<V> callable; // 任务结果,非 volatile,因为受 state 读写保护 private Object outcome; // 运行 callable 任务的线程 private volatile Thread runner; private volatile WaitNode waiters; // 在 Treiber stack 中记录等待中的线程 static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } } } 复制代码
两个重要的构造函数:
// 包装 Callable,state = NEW public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } 复制代码
第二个构造器使用了 Executors 工具类的 callable() 方法来把 Runnable 适配成 Callable。典型的 适配器模式。
public static <T> Callable<T> callable(Runnable task, T result) { return new RunnableAdapter<T>(task, result); } static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } } 复制代码
class ThreadPoolExecutor { // 状态控制,包含两个信息: // workerCount: 工作线程数 // runState: 表明线程池运行状态 // RUNNING: 可以接受新的 task,且能处理队列中的 task // SHUTDOWN: 不接受新的 task,但能处理队列中的 task // STOP: 不接受新的 task,不处理队列中的 task,并且中断运行中的 task // TIDYING: 所有任务都被终止,workerCount = 0,线程状态为 TIDYING // TERMINATED: terminated() 方法执行完成 // RUNNING // shutdownNow()| / shutdown() // |/ // // STOP---- SHUTDOWN // 线程池为空| / 队列和线程池都为空 // |/ // / // TIDYING------------- TERMINATED // terminated() // 初始化:ctl = RUNNING private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 11101 private static final int COUNT_BITS = Integer.SIZE - 3; // 00011111111111111111111111111111 536870911 // worker 线程最大数量 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState 存储在高位 // 111 开头 -536870912 private static final int RUNNING = -1 << COUNT_BITS; // 000 开头 0 private static final int SHUTDOWN = 0 << COUNT_BITS; // 001 开头 536870912 private static final int STOP = 1 << COUNT_BITS; // 010 开头 1073741824 private static final int TIDYING = 2 << COUNT_BITS; // 011 开头 1610612736 private static final int TERMINATED = 3 << COUNT_BITS; // ~CAPACITY = 11100000000000000000000000000000 private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } private static boolean runStateLessThan(int c, int s) { return c < s; } private static boolean runStateAtLeast(int c, int s) { return c >= s; } private static boolean isRunning(int c) { return c < SHUTDOWN; } } 复制代码
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 执行任务线程 final Thread thread; // 待执行任务 Runnable firstTask; // 每个线程完成任务数 volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // 设置AQS的同步状态为-1,禁止中断,直到调用 runWorker this.firstTask = firstTask; // 使用 ThreadFactory 创建线程 this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { // 核心 runWorker(this); } } 复制代码
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 初始时,ctl = RUNNING = 11100000000000000000000000000000 int c = ctl.get(); // wokerCountOf(c) => c & 00011111111111111111111111111111 // 线程数小于 corePoolSize,创建线程执行 task if (workerCountOf(c) < corePoolSize) { // 创建线程,并启动线程执行 command if (addWorker(command, true)) return; c = ctl.get(); } // 两种情况: // ① 线程数大于等于 corePoolSize; // ② 线程数小于 corePoolSize,但是 addWorker() 线程池状态不符合条件,或创建线程失败或启动线程失败 // 线程池处于运行状态才能往队列添加任务 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 线程池非 RUNNING 状态 // 队列已满 // 此时使用 maximumPoolSize 作为界限判断 else if (!addWorker(command, false)) reject(command); } /** * addWoker() 方法返回 false 几种情况: * 1)线程池停止或 shutdown * 2)ThreadFactory 创建线程失败 * @param core true:使用 corePoolSize 作为界限;false:使用 maximumPoolSize 作为界限 */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // ① 线程池状态是 RUNNING,可以创建工作线程,即执行下面的 for 循环 // ② 线程池状态是 STOP、TIDYING 或 TERMINATED,直接返回 false,即不能创建工作线程执行任务 // ③ 线程池状态时 SHUTDOWN,此时 firstTask 在此种状态下应该为 null,即它不能接收新的任务 // 所以如果 firstTask 不为 null,那么可以直接返回 false // 如果 firstTask 为 null,此时线程池可以处理队列中任务,所以如果队列为空,那么直接返回 false,否则需要创建工作线程处理队列中的任务 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // runState 为 RUNNING 或 SHUTDOWN for (;;) { int wc = workerCountOf(c); // woker 线程数超过界限,返回 false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // CAS 增加 workerCount if (compareAndIncrementWorkerCount(c)) break retry; // 失败 c = ctl.get(); // Re-read ctl // 可能 runState 被其他线程改变,非 RUNNING 或 SHUTDOWN 状态 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 初始化 Worker,创建线程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 线程创建成功 // 全局锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 再次检查线程池的运行状态等 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // 线程处于活跃状态,即线程已经开始执行或者还未死亡,正确的应线程在这里应该是还未开始执行的 throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 实际会调用 Worker 类的 run() 方法 t.start(); workerStarted = true; } } } finally { if (! workerStarted) //未能成功创建执行工作线程 addWorkerFailed(w); //在启动工作线程失败后,将工作线程从集合中移除 } return workerStarted; } 复制代码
从上述代码分析可以看出,有很多书上在分析线程池的执行原理是有问题的。
小结上述逻辑:
整个流程图如下,画的不太好,主要还是为了梳理逻辑,因为整个代码用到了太多 if() 语句的短路思维,当然这些主要是跟线程池的状态有关。