转载

Java并发系列 — 线程池

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。

在开发过程中,合理地使用线程池能够带来3个好处:

  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

  • 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。

  • 提高线程的可管理性:线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用线程池,必须对其实现原理了如指掌。

如何创建线程池

1、使用 Executors 工厂类提供的静态方法来创建线程池,方法如下:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                    60L, TimeUnit.SECONDS,
                                    new SynchronousQueue<Runnable>());
}

2、通过 ThreadPoolExecutor 的构造函数创建,代码如下:

public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory,
                            RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

参数说明:

  • 1、corePoolSize

    线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于 corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

  • 2、maximumPoolSize

    线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize。

  • 3、keepAliveTime

    线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间;默认情况下,该参数只在线程数大于corePoolSize时才有用。

  • 4、unit

    keepAliveTime的单位

  • 5、workQueue

    用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口。

    在JDK中提供了如下阻塞队列: ArrayBlockingQueue :基于数组结构的有界阻塞队列,按FIFO排序任务;

    LinkedBlockingQuene :基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;

    SynchronousQuene :一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;

    priorityBlockingQuene :具有优先级的无界阻塞队列;

  • 6、threadFactory

    创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。

  • 7、handler

    线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:

    AbortPolicy
    CallerRunsPolicy
    DiscardOldestPolicy
    DiscardPolicy
    

    当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

使用示例

public class ThreadPoolExecutorExample {

    // 1、通过threadPoolExecutor的构造函数创建线程池
    private static ThreadPoolExecutor threadPoolExecutor = 
            new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS,
                                    new ArrayBlockingQueue<Runnable>(10));


    public static void main(String[] args)
                throws ExecutionException, InterruptedException {

        // 2、使用execute方法执行没有返回结果的任务
        threadPoolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                new Task().doSomething();
            }
        });

        // 3、使用submit方法执行有返回结果的任务且需要实现Callable接口
        Future future = threadPoolExecutor.submit(new Callable<Integer>() {
              @Override
              public Integer call() throws Exception {
                  return new Task().doOtherthing();
              }
          }
        );
        System.out.println(future.get());
    }
}

class Task {

    public void doSomething() {
        System.out.println("doSomeThing ...");
    }

    public int doOtherthing() {
        System.out.println("doOtherthing ..., and return 10.");
        return 10;
    }
}

execute方法和submit方法的区别

  • execute方法

    用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。

  • submit方法

    用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功。 get() 方法会阻塞当前线程直到任务完成; get(long timeout,TimeUnit unit) 方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

ThreadPoolExecutor的实现原理

当向线程池提交一个任务之后,线程池是如何处理这个任务的呢?线程池的主要处理流程如下:

Java并发系列 — 线程池
  1. 如果当前运行的线程小于 corePoolSize ,则创建新线程来执行;
  2. 如果运行的线程等于或多于 corePoolSize ,则将任务加入BlockingQueue;
  3. 如果无法将任务加入 BlockingQueue (队列已满),则创建新的线程来处理任务;
  4. 如果创建新线程将使当前运行的线程超出 maximumPoolSize ,任务将被拒绝,并调用 handler.rejectedExecution(command, this) 方法。

ThreadPoolExecutor 采取上述步骤的总体设计思路,是为了在执行executor()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在 ThreadPoolExecutor 完成预热之后(当前运行的线程数大于等于 corePoolSize ),几乎所有的 execute() 方法调用都是执行步骤2,而步骤2不需要获取全局锁。

ThreadPoolExecutor源码分析

类结构图

Java并发系列 — 线程池

核心变量与方法(状态转换)

// 初始化状态和数量,状态为RUNNING,线程数为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 前3位表示状态,所有线程数占29位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池容量大小为 1 << 29 - 1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 线程池状态
// RUNNING状态:11100000000000000000000000000000(前3位为111)
private static final int RUNNING    = -1 << COUNT_BITS;
// SHUTDOWN状态:00000000000000000000000000000000(前3位为000)
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// STOP状态:00100000000000000000000000000000(前3位为001)
private static final int STOP       =  1 << COUNT_BITS;
// TIDYING状态:01000000000000000000000000000000(前3位为010)
private static final int TIDYING    =  2 << COUNT_BITS;
// TERMINATED状态:01100000000000000000000000000000(前3位为011)
private static final int TERMINATED =  3 << COUNT_BITS;

// 得到状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 得到线程数
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

ThreadPoolExecutor线程池有5个状态,分别是:

  1. RUNNING :可以接受新的任务,也可以处理阻塞队列里的任务。
  2. SHUTDOWN :不接受新的任务,但是可以处理阻塞队列里的任务。
  3. STOP :不接受新的任务,不处理阻塞队列里的任务,中断正在处理的任务。
  4. TIDYING :过渡状态,也就是说所有的任务都执行完了,当前线程池已经没有有效的线程,这个时候线程池的状态将会TIDYING,并且将要调用 terminated 方法。
  5. TERMINATED :终止状态。 terminated 方法调用完成以后的状态。

线程池的状态转换过程:

  • RUNNING -> SHUTDOWN On invocation of shutdown(), perhaps implicitly in finalize()
  • (RUNNING or SHUTDOWN) -> STOP On invocation of shutdownNow()
  • SHUTDOWN -> TIDYING When both queue and pool are empty
  • STOP -> TIDYING When pool is empty
  • TIDYING -> TERMINATED When the terminated() hook method has completed

Threads waiting in awaitTermination() will return when the state reaches TERMINATED.

核心方法

execute方法

使用ThreadPoolExecutor执行任务的时候,可以使用execute或submit方法,submit方法如下:

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

通过源码可知 submit 方法同样也是由execute()完成的,execute()方法源码如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 过程1
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    // 过程2
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次检查线程池状态
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 线程池中没有可用的工作线程时    
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false)) // 过程3
        // 过程4
        reject(command);
}

addWorker方法

addWorker 方法的主要工作就是创建一个工作线程执行任务,代码如下:

/*
 * firstTask参数:用于指定新增的线程执行的第一个任务。
 * core为true:表示在新增线程时会判断当前活动线程数是否少于corePoolSize,
 *      false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize。
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        /**
         * 只有当下面两种情况会继续执行,其他直接返回false(添加失败)
         * 1、rs == RUNNING
         * 2、rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty() 
         *(执行了shutdown方法,但是阻塞队列还有任务没有执行)
         */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 判断工作线程的数量是否超过线程池的限制
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // workerCount加1成功,跳出两层循坏。
            if (compareAndIncrementWorkerCount(c))
                break retry;
            /**
              * 能执行到这里,都是因为多线程竞争,只有两种情况
              * 1、workCount发生变化,compareAndIncrementWorkerCount失败,
              *                         这种情况不需要重新获取ctl,继续for循环即可。
              * 2、runState发生变化,可能执行了shutdown或者shutdownNow,
              *                       这种情况重新走retry,取得最新的ctl并判断状态。
              */
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    // worker是否执行标识
    boolean workerStarted = false;
    // worker是否添加成功标识
    boolean workerAdded = false;
    // 保存创建的worker变量
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        // 检查线程是否创建成功
        if (t != null) {
            // 加锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 加锁成功,重新检查线程池的状态 
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 将w存储到workers容器中
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 添加成功标识
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 执行任务
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            // 失败回退,从 wokers 移除 w, 线程数减一,尝试结束线程池(调用tryTerminate 方法)
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker类

在分析 t.start() 之前,需要了解Worker类。其源码如下:

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{

    // 工作线程
    final Thread thread;
    // 初始化任务
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        // 禁止中断,直到runWorker
        setState(-1);
        this.firstTask = firstTask;
        // 很重要,worker实例被包装成thread执行的任务。
        // 这样t.start启动后,将运行Worker的run方法。
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        runWorker(this);
    }

    // 实现AQS的相关方法
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

runWorker方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 自旋操作,获取队列中的任务
        while (task != null || (task = getTask()) != null) {
            // 加锁的作用是线程池关闭时,防止正在执行工作线程被中断。
            w.lock();
             /*
              * 在执行任务之前先做一些处理。
              *  1. 如果线程池已经处于STOP状态并且当前线程没有被中断,中断线程。
              *  2. 如果线程池还处于RUNNING或SHUTDOWN状态,并且当前线程已经被中断了,
              *       重新检查一下线程池状态,如果处于* * STOP状态并且没有被中断,那么中断线程。
              */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();

            try {
                // hook method
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                     // 真正的开始执行任务,调用的是run方法,而不是start方法。
                    //  这里run的时候可能会被中断,比如线程池调用了shutdownNow方法
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // hook method
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 回收woker
        processWorkerExit(w, completedAbruptly);
    }
}

getTask方法

private Runnable getTask() {

    boolean timedOut = false;

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 计算从队列获取任务的方式( poll or take)
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 当工作线程超过其最大值或者timed = true时其workQueue.isEmpty()时,返回null。
        // 这意味为该worker将被回收。
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

由上可知,当 allowCoreThreadTimeOut 为true时,如果队列长时间没有任务,工作线程最终都会被销毁。

其他知识点

关闭线程池

可以通过调用线程池的 shutdownshutdownNow 方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。

但是它们存在一定的区别,shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。

只要调用了这两个关闭方法中的任意一个, isShutdown 方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminaed 方法会返回true。

合理地配置线程池

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。

  • 任务的性质:CPU密集型任务、IO密集型任务和混合型任务。

  • 任务的优先级:高、中和低。

  • 任务的执行时间:长、中和短。

  • 任务的依赖性:是否依赖其他系统资源,如数据库连接。

性质不同的任务可以用不同规模的线程池分开处理:

1)CPU密集型任务应配置尽可能小的线程,如配置N cpu +1个线程的线程池。

2)IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*N cpu 。

3)混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。

优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。

执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。

依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。

可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。

建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。有一次我们组使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞住,任务积压在线程池里。如果当时我们设置成无界队列,线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然我们的系统所有的任务是用的单独的服务器部署的,而我们使用不同规模的线程池跑不同类型的任务,但是出现这样问题时也会影响到其他任务。

线程池监控

如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,可以根据线程池的使用状况快速定位问题。可以通过线程池提供的参数进行监控,在监控线程池的时机可以使用以下属性:

  • taskCount:线程池需要执行的任务数量。
  • completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount。
  • largestPoolSize:线程池里曾经创建过的最大线程数量。通过这个数据可以直到线程池是否曾经满过。
  • getPoolSize:线程池的线程数量。
  • getActiveCount:获取活动的线程数。

通过扩展线程池进行监控。可以通过继承线程池来自定义线程池,重写线程池的 beforeExecuteafterExecuteterminated 方法,也可以在任务执行前、执行后和线程池关闭之前执行一些代码来进行监控。

参考资料

  1. Java线程池ThreadPoolExecutor源码分析

  2. 【细谈Java并发】谈谈线程池:ThreadPoolExecutor

如果读完觉得有收获的话,欢迎点赞、关注、加公众号【牛觅技术】,查阅更多精彩历史!!!:

Java并发系列 — 线程池
原文  https://juejin.im/post/5b07eb60f265da0dbf0057f8
正文到此结束
Loading...