要求:线程资源必须通过 线程池
当前线程总数< corePoolSize
,新建的线程即为核心线程。 当前线程总数< corePoolSize
,新建的线程即为核心线程。 核心线程默认情况下会一直存活在线程池中,即使这个核心线程不工作(空闲状态),除非 ThreadPoolExecutor 的 allowCoreThreadTimeOut
这个属性为 true
线程总数 = 核心线程数 + 非核心线程数
keepAliveTime即为空闲线程允许的最大的存活时间。如果一个非核心线程空闲状态的时长超过keepAliveTime了,就会被销毁掉。注意:如果设置 allowCoreThreadTimeOut = true
TimeUnit 是一个枚举类型,列举如下:
单位 | 说明 |
NANOSECONDS | 1微毫秒 = 1微秒 / 1000 |
MICROSECONDS | 1微秒 = 1毫秒 / 1000 |
MILLISECONDS | 1毫秒 = 1秒 /1000 |
HOURS | 小时 |
DAYS | 天 |
常用的 workQueue 类型:(无界队列、有界队列、同步移交队列)
,SynchronousQueue队列接收到任务后,会 直接将任务从生产者移交给工作者线程
,这种移交机制高效。它是一种不存储元素的队列,任务不会先放到队列中去等线程来取,而是直接移交给执行的线程。只有当线程池是无界的或可以拒绝任务的时候,SynchronousQueue队列的使用才有意义,maximumPoolSize 一般指定成 Integer.MAX_VALUE,即无限大。要将一个元素放入SynchronousQueue,就需要有另一个线程在等待接收这个元素。若没有线程在等待,并且线程池的当前线程数小于最大值,则ThreadPoolExecutor就会新建一个线程;否则,根据饱和策略,拒绝任务。 newCachedThreadPool
默认使用的就是这种同步移交队列。吞吐量高于LinkedBlockingQueue。 链表结构
的阻塞队列, FIFO原则排序
。当任务提交过来,若当前线程数小于corePoolSize核心线程数,则线程池新建核心线程去执行任务;若当前线程数等于corePoolSize核心线程数,则进入工作队列进行等待。LinkedBlockingQueue队列没有最大值限制,只要任务数超过核心线程数,都会被添加到队列中,这就会导致 总线程数永远不会超过 corePoolSize
,所以maximumPoolSize 是一个无效设定。 newFixedThreadPool
和 newSingleThreadPool
默认是使用的是 无界LinkedBlockingQueue队列
。吞吐量高于ArrayBlockingQueue。 数组结构
的 有界
阻塞队列,可以设置队列上限值, FIFO原则排序
。当任务提交时,若当前线程小于corePoolSize核心线程数,则新建核心线程执行任务;若当先线程数等于corePoolSize核心线程数,则进入队列排队等候;若队列的任务数也排满了,则新建非核心线程执行任务;若队列满了且总线程数达到了maximumPoolSize最大线程数,则根据饱和策略进行任务的拒绝。 注意:只有当任务相互独立没有任何依赖的时候,线程池或工作队列设置有界是合理的;若任务之间存在依赖性,需要使用无界的线程池,如newCachedThreadPool,否则有可能会导致死锁问题。
创建线程的方式,这是一个接口,你 new 他的时候需要实现他的 Thread newThread(Runnable r) 方法,一般用不上,
未达到 corePoolSize
,则 新建一个线程(核心线程)
执行任务 达到了 corePoolSize
,则将任务移入 阻塞队列等待
,让空闲线程处理; 队列已满
, 新建线程(非核心线程)
执行任务 总线程数又达到了 maximumPoolSize
,就会按照拒绝策略处理无法执行的任务,比如RejectedExecutionHandler抛出异常。 这边,为了大家能够更好的去理解这块的流程,我们举一个例子。生活中我们经常会去打一些公司的咨询电话或者是一些特定机构的投诉电话,而那个公司或者机构的客服中心就是一个 线程池
,正式员工的客服小姐姐就好比是 核心线程
5. 当用户的电话打进到公司的客服中心的时候 (提交任务)
6. 客服中心会调度客服小姐姐去接听电话 (创建线程执行任务)
,如果接听的电话超过了6个,6个客服小姐姐都在接听的工作状态了 (核心线程池满了)
,这时客服中心会有一个电话接听等待通道 (进入任务队列等待)
7. 当然,这个电话接听等待通道也是有上限的,当超过这个上限的时候 (任务队列满了)
,客服中心就会立即安排外协员工 (非核心线程)
,也就是非正式员工去接听额外的电话 (任务队列满了,正式和非正式员工数量>总任务数,线程池创建非核心线程去执行任务)
8. 当用户电话数激增,客服中心控制台发现这个时候正式员工和外协员工的总和已经满足不了这些用户电话接入了 (总线程池满)
,就开始根据一些公司电话接听规则去拒绝这些电话 (按照拒绝策略处理无法执行的任务)
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, //核心线程数 int maximumPoolSize, //最大线程数 long keepAliveTime, //空闲线程存活时间 TimeUnit unit, //存活时间单位 BlockingQueue<Runnable> workQueue, //任务的阻塞队列 ThreadFactory threadFactory, //新线程的产生方式 RejectedExecutionHandler handler //拒绝策略) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
ThreadPoolExecutor 继承 AbstractExecutorService;AbstractExecutorService 实现 ExecutorService, ExecutorService 继承 Executor
public class ThreadPoolExecutor extends AbstractExecutorService {} public abstract class AbstractExecutorService implements ExecutorService {} public interface ExecutorService extends Executor {}
// 5参数构造器 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
// 6参数构造器-1 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
// 6参数构造器-2 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
// 7参数构造器 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
//创建固定数目线程的线程池 Executors.newFixedThreadPool(200); //创建一个无限线程的线程池,无需等待队列,任务提交即执行 Executors.newCachedThreadPool() //创建有且仅有一个线程的线程池 Executors.newSingleThreadExecutor();
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads);
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
ExecutorService singleThreadPool = Executors.newSingleThreadPool();
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown(); } }
ThreadPoolExecutor.execute(Runnable command)方法,即可向线程池内添加一个任务
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ //获取当前线程池的状态 int c = ctl.get(); //若当前线程数量小于corePoolSize,则创建一个新的线程 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //判断当前线程是否处于运行状态,且写入任务阻塞队列是否成功 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //再次获取线程状态进行双重检查;如果线程变成非运行状态,则从阻塞队列移除任务; if (! isRunning(recheck) && remove(command)) //执行拒绝策略 reject(command); //若当前线程池为空,则新建一个线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //当前线程为非运行状态并且尝试新建线程,若失败则执行拒绝策略。 else if (!addWorker(command, false)) reject(command); }
1)若 当前线程数小于corePoolSize
2)若 当前线程不小于corePoolSize
3)若 队列里的任务数到达上限
,且 当前运行线程小于maximumPoolSize
4)若创建线程也失败( 队列任务数到达上限
,且 当前线程数达到了maximumPoolSize
ThreadPoolExecutor的饱和策略可以通过调用 setRejectedExecutionHandler
RejectedExecutionHandler rejected = null; //默认策略,阻塞队列满,则丢任务、抛出异常 rejected = new ThreadPoolExecutor.AbortPolicy(); //阻塞队列满,则丢任务,不抛异常 rejected = new ThreadPoolExecutor.DiscardPolicy(); //删除队列中最旧的任务(最早进入队列的任务),尝试重新提交新的任务 rejected = new ThreadPoolExecutor.DiscardOldestPolicy(); //队列满,不丢任务,不抛异常,若添加到线程池失败,那么主线程会自己去执行该任务 rejected = new ThreadPoolExecutor.CallerRunsPolicy();
是 默认的饱和策略
当新提交的任务 无法保存到队列
中等待执行时, DiscardPolicy
会悄悄的 抛弃该任务
则会 抛弃最旧的
CallerRunsPolicy是“调用者运行”策略,实现了一种调节机制 。它 不会抛弃任务
,也 不会抛出异常
。 而是 将任务回退到调用者
。它 不会在线程池中
执行任务,而是 在一个调用了execute的线程中
