转载

Java并发——深入 ThreadPoolExecutor 任务执行原理

它用于执行指定的任务,把任务提交与任务执行分离,程序员不需要关注线程的管理,以及任务的执行。 ExecutorService 接口对 Executor 接口提供更多的扩展,ThreadPoolExecutor 类提供 可以扩展的线程池实现,而 Executors 只是对这些 Executor 提供方便的工厂方法。

1.1 类图

Java并发——深入 ThreadPoolExecutor 任务执行原理

2 Future

2.1 类图

Java并发——深入 ThreadPoolExecutor 任务执行原理

2.2 FutureTask

类结构:

class FutureTask {
    // 任务运行状态
    /*
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
    
    // 底层运行的 callable
    private Callable<V> callable;
    // 任务结果,非 volatile,因为受 state 读写保护
    private Object outcome;
    // 运行 callable 任务的线程
    private volatile Thread runner;
    
    private volatile WaitNode waiters;
    
    //  在 Treiber stack 中记录等待中的线程
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }
}
复制代码

两个重要的构造函数:

// 包装 Callable,state = NEW
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}
复制代码

第二个构造器使用了 Executors 工具类的 callable() 方法来把 Runnable 适配成 Callable。典型的 适配器模式。

public static <T> Callable<T> callable(Runnable task, T result) {
    return new RunnableAdapter<T>(task, result);
}

static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}
复制代码

3 ThreadPoolExecutor

3.1 类结构

class ThreadPoolExecutor {
    // 状态控制,包含两个信息:
    // workerCount: 工作线程数
    // runState: 表明线程池运行状态
    //     RUNNING: 可以接受新的 task,且能处理队列中的 task
    //     SHUTDOWN: 不接受新的 task,但能处理队列中的 task
    //     STOP: 不接受新的 task,不处理队列中的 task,并且中断运行中的 task
    //     TIDYING: 所有任务都被终止,workerCount = 0,线程状态为 TIDYING
    //     TERMINATED: terminated() 方法执行完成
    //                  RUNNING
    //       shutdownNow()|    / shutdown()
    //                    |/    //
    //                 STOP---- SHUTDOWN
    //            线程池为空|       / 队列和线程池都为空
    //                    |/    //          /
    //                    TIDYING------------- TERMINATED
    //                            terminated()
    // 初始化:ctl = RUNNING
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 11101
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 00011111111111111111111111111111  536870911
    // worker 线程最大数量
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState 存储在高位
    // 111 开头 -536870912
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 000 开头 0
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 001 开头 536870912
    private static final int STOP       =  1 << COUNT_BITS;
    // 010 开头 1073741824
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 011 开头 1610612736
    private static final int TERMINATED =  3 << COUNT_BITS;

    // ~CAPACITY = 11100000000000000000000000000000
    private static int runStateOf(int c)     { return c & ~CAPACITY; }

    private static int workerCountOf(int c)  { return c & CAPACITY; }

    private static int ctlOf(int rs, int wc) { return rs | wc; }

    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
}
复制代码

3.2 内部类 Worker

private final class Worker extends AbstractQueuedSynchronizer
        implements Runnable {
    // 执行任务线程
    final Thread thread;
    // 待执行任务
    Runnable firstTask;
    // 每个线程完成任务数
    volatile long completedTasks;
    
    Worker(Runnable firstTask) {
        setState(-1); // 设置AQS的同步状态为-1,禁止中断,直到调用 runWorker
        this.firstTask = firstTask;
        // 使用 ThreadFactory 创建线程
        this.thread = getThreadFactory().newThread(this);
    }
    
    /** Delegates main run loop to outer runWorker  */
    public void run() {
        // 核心
        runWorker(this);
    }
}
复制代码

3.3 任务执行(execute)

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 初始时,ctl = RUNNING = 11100000000000000000000000000000
    int c = ctl.get();
    // wokerCountOf(c) => c & 00011111111111111111111111111111
    // 线程数小于 corePoolSize,创建线程执行 task
    if (workerCountOf(c) < corePoolSize) {
        // 创建线程,并启动线程执行 command
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    // 两种情况:
    // ① 线程数大于等于 corePoolSize;
    // ② 线程数小于 corePoolSize,但是 addWorker() 线程池状态不符合条件,或创建线程失败或启动线程失败
    // 线程池处于运行状态才能往队列添加任务
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 线程池非 RUNNING 状态
    // 队列已满
    // 此时使用 maximumPoolSize 作为界限判断
    else if (!addWorker(command, false))
        reject(command);
}

/**
 * addWoker() 方法返回 false 几种情况:
 *    1)线程池停止或 shutdown 
 *    2)ThreadFactory 创建线程失败
 * @param core true:使用 corePoolSize 作为界限;false:使用 maximumPoolSize 作为界限
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        
        // ① 线程池状态是 RUNNING,可以创建工作线程,即执行下面的 for 循环
        // ② 线程池状态是 STOP、TIDYING 或 TERMINATED,直接返回 false,即不能创建工作线程执行任务
        // ③ 线程池状态时 SHUTDOWN,此时 firstTask 在此种状态下应该为 null,即它不能接收新的任务
        // 所以如果 firstTask 不为 null,那么可以直接返回 false
        // 如果 firstTask 为 null,此时线程池可以处理队列中任务,所以如果队列为空,那么直接返回 false,否则需要创建工作线程处理队列中的任务
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        
        // runState 为 RUNNING 或 SHUTDOWN
        for (;;) {
            int wc = workerCountOf(c);
            // woker 线程数超过界限,返回 false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
 
            // CAS 增加 workerCount
            if (compareAndIncrementWorkerCount(c))
                break retry; // 失败
            c = ctl.get();  // Re-read ctl
            // 可能 runState 被其他线程改变,非 RUNNING 或 SHUTDOWN 状态
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 初始化 Worker,创建线程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        
        if (t != null) { // 线程创建成功
            // 全局锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 再次检查线程池的运行状态等
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) 
                        // 线程处于活跃状态,即线程已经开始执行或者还未死亡,正确的应线程在这里应该是还未开始执行的
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 实际会调用 Worker 类的 run() 方法
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted) //未能成功创建执行工作线程
            addWorkerFailed(w);  //在启动工作线程失败后,将工作线程从集合中移除
    }
    return workerStarted;
}
复制代码

从上述代码分析可以看出,有很多书上在分析线程池的执行原理是有问题的。

Java并发——深入 ThreadPoolExecutor 任务执行原理

小结上述逻辑:

  • 线程池控制标识是原子类型,即 AtomicInteger
  • execute() 方法只关注线程池核心工作流程,具体执行细节交由其他方法处理,主要是 addWorker() 方法
  • 工作线程小于 corePoolSize 时,创建工作线程以及执行任务统一交给 addWorker() 方法处理
  • addWorker() 方法在创建工作线程执行任务之前,需要判断线程池当前状态是否满足可以创建工作线程。
    • RUNNING 状态可以接收新的任务(能创建工作线程),且能处理队列中的任务
    • SHUTDOWN 状态不能接收新的任务(不能创建工作线程),但能处理队列中的任务
  • 使用 CAS 计数工作线程,即更新线程池控制标识
  • 任务和线程统一由其内部类 Worker 进行封装,Worker 类本身也是 Runnable

整个流程图如下,画的不太好,主要还是为了梳理逻辑,因为整个代码用到了太多 if() 语句的短路思维,当然这些主要是跟线程池的状态有关。

Java并发——深入 ThreadPoolExecutor 任务执行原理
原文  https://juejin.im/post/5ef725385188252e3b06b391
正文到此结束
Loading...