线程池用于存放线程, 通过对线程的复用, 很大程度上减少了频繁创建和销毁线程导致的资源损耗. 下面简单地介绍一下 JDK(1.8)中的线程池.
在介绍JDK的4种线程池之前, 先介绍一下线程池的几个参数
allowCoreThreadTimeOut 允许核心线程池被回收, 默认 false, so 默认情况下 核心线程并不会被回收掉.
// 用一个原子变量来存储线程池状态和线程数量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 32 - 3 = 29, int 的左边三位表示线程池状态,右边29位表示线程数量 private static final int COUNT_BITS = Integer.SIZE - 3; // 最大线程数量: 000 & (29个1) = 2^29-1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 线程池的几个状态 // 100 & (29个0),线程池运行状态 private static final int RUNNING = -1 << COUNT_BITS; // 000 & (29个0), 执行shutdown方法时,不接收新的任务, 会先执行完当前已经接收的, 任务队列任务执行完 && 线程已经全部终止: state -> TIDYING private static final int SHUTDOWN = 0 << COUNT_BITS; // 001 & (29个0), 执行 shutdownNow() 方法时 private static final int STOP = 1 << COUNT_BITS; // 010 & (29个0) private static final int TIDYING = 2 << COUNT_BITS; // 011 & (29个0), 线程池已经终止 private static final int TERMINATED = 3 << COUNT_BITS; 复制代码
固定线程池数量, 核心线程数 = 最大线程数
任务队列: LinkedBlockingQueue(Integer.MAX_VALUE) 无界队列
适用于同时处理固定任务数的场景.
public static ExecutorService newFixedThreadPool(int nThreads) { // coreThreads = maxThreads return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } 复制代码
核心线程数为0, 最大为 Integer.MAX_VALUE,也就是当任务足够多的时候, 可以无限增加线程. 并且所有的线程空闲超过一段时间(调用 Executors 创建的默认 KeepAlive为 60s)就会被回收.
任务队列: SynchronousQueue 默认传入参数 fair=false
, 即处理任务非公平.
适用于处理小任务
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 复制代码
SynchronousQueue
public SynchronousQueue(boolean fair) { // fair = true 则会按照FIFO先入先出的顺序执行 // fair = false(默认值) 则优先取出最新添加的任务, 最早添加的任务最晚执行 transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); } 复制代码
单个线程的线程池
任务队列: LinkedBlockingQueue 同样是无界队列
适用于需要将任务按顺序执行的时候
public static ExecutorService newSingleThreadExecutor() { // 核心线程数=最大线程数=1 return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } 复制代码
固定核心线程数, 线程数量不会再增长, maximumPoolSize 这个参数对定时线程池没有作用.
oracle的api文档是这么写的:
While this class inherits from
ThreadPoolExecutor
, a few of the inherited tuning methods are not useful for it. In particular, because it acts as a fixed-sized pool using corePoolSize
threads and an unbounded queue, adjustments to maximumPoolSize
have no useful effect.
任务队列: DelayedWorkQueue 无界队列, 这是 ScheduledThreadPoolExecutor 的一个内部类
更适用于需要延时执行或者定时需求的场景
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } 复制代码
拒绝新任务,并且抛出异常, 默认的拒绝策略
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 直接抛出异常 throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } 复制代码
当拒绝任务的时候,由调用线程处理该任务.
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 如果线程池未停止 if (!e.isShutdown()) { // 当前线程直接调用任务的run方法 r.run(); } } } 复制代码
拒绝新任务,静悄悄的将新任务丢弃,而不通知(太坑了吧), 具体看它的代码也是什么事情都没做, 还真的就直接丢弃任务了.
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 任务被拒绝后,啥事情都不干 } } 复制代码
当任务满时, 抛弃旧的未处理的任务, 然后重新执行 execute 方法(此过程会重复), 除非线程池停止运行, 这种情况任务将被丢弃.具体看代码
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 如果线程池停止, 直接丢弃任务,不做任何处理 if (!e.isShutdown()) { // 丢弃一个任务队列中的任务 e.getQueue().poll(); // 重新执行被拒绝的任务, 如果再次被拒绝, 则会一直重复这个过程 e.execute(r); } } } 复制代码
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 修改状态为 shutdown advanceRunState(SHUTDOWN); // 停止空闲的线程, 有执行任务的不会停止 interruptIdleWorkers(); // ScheduledThreadPoolExecutor 定时线程池才用到这个方法 onShutdown(); } finally { mainLock.unlock(); } // 终止线程池里的所有线程 tryTerminate(); } 复制代码
interruptIdleWorkers
, 从方法名可以看出, 它会终止掉当前空闲的线程
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // 因为执行任务 runWorker() 的时候, 执行了 worker.lock()方法 // 所以如果当前线程有任务在执行, 则 tryLock不会成功, if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } } 复制代码
这个方法的作用是立刻强制停止所有线程, 即使该线程有正在执行的任务.
并且停止所有线程后,返回任务队列中还未执行的任务.
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 修改状态为 stop advanceRunState(STOP); // 强制停止所有线程,有任务在执行也不管 interruptWorkers(); // 返回还未执行的任务 tasks = drainQueue(); } finally { mainLock.unlock(); } // 终止线程池里的所有线程 tryTerminate(); return tasks; } // 暂停所有 worker 线程 private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } void interruptIfStarted() { Thread t; // 如果线程已经启动则直接终止 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } 复制代码
下面这段代码两个shutdown方法都调用到
final void tryTerminate() { for (;;) { int c = ctl.get(); // 以下情况,取消终止: // 1.线程池是运行状态 running // 2.runStateAtLeast(c, TIDYING) 线程池状态是 tidying 或者 terminated // 3.线程池状态为 shutdown && 任务队列不为空,即是任务没处理完 (shutdown状态的时候会继续处理已经添加的任务) if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; /** * worker 在 {@link #getTask()} 的时候, 获取到 null 表示 worker 此时需要停止 * worker[] 移除当前worker后会调用这个方法将线程进行终止 * 每个worker停止的时候, 会调用这个方法将当前线程进行终止 * so it is ONLY_ONE * {@link #processWorkerExit} */ if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 代码要执行到这里到话, ( state == stop || ( state == shutdown && workQueue 队列为空) ) && 没有正在运行的线程 // 这两种情况下, 所有的线程都已经终止 // cas 尝试修改 ctl 的状态, // 修改失败,外层 for循环会再次执行 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 这里不做任何事情 terminated(); } finally { // 最后将状态修改为 terminated, 表示线程池完全停止 ctl.set(ctlOf(TERMINATED, 0)); // 通知所有在等待锁的线程 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } } 复制代码
shutdown: 线程将执行完已经添加进队列中的所有任务, 不接受新任务.
shutdownNow: 立刻终止所有正在运行的线程, 并且返回任务队列中的任务.