ThreadPoolExecutor提供了4种构造方法,以最多参数的为例
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
序号 | 参数名 | 类型 | 含义 |
---|---|---|---|
1 | corePoolSize | int | 核心线程数 |
2 | maximumPoolSize | int | 最大线程数 |
3 | keepAliveTime | long | 线程最大空闲时间 |
4 | unit | TimeUnit | 空闲时间单位 |
5 | workQueue | BlockingQueue<Runnable> | 工作队列 |
6 | threadFactory | ThreadFactory | 线程工厂 |
7 | handler | RejectedExecutionHandler | 拒绝策略 |
移步
新的线程通过指定的ThreadFactory创建。 如果未指定,则使用Executors.defaultThreadFactory默认工厂,创建的线程将全部属于同一个ThreadGroup中。
DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; }
所有的线程具有相同的NORM_PRIORITY优先级和非守护进程状态。
public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; }
通过指定的ThreadFactory,可以控制线程的名称,线程组,优先级,守护进程状态等。
ThreadPoolExecutor.AbortPolicy 抛出java.util.concurrent.RejectedExecutionException异常
ThreadPoolExecutor.CallerRunsPolicy 线程调用运行该任务的execute 本身运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
ThreadPoolExecutor.DiscardPolicy 默认情况下将丢弃被拒绝的任务
ThreadPoolExecutor.DiscardOldestPolicy 如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序
当一个任务通过execute(Runnable)方法欲添加到线程池时
如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,就要通过 handler所指定的策略来处理被拒绝的任务。
当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
corePoolSize与maximumPoolSize相等,所有线程都是核心线程,线程池大小固定;
keepAliveTime = 0 该参数无效,因为FixedThreadPool全部为核心线程;
workQueue = LinkedBlockingQueue,缺省初始化大小时队列最大长度为Integer.MAX_VALUE,实际上有内存大小控制队列实际长度,(JVM线程分配内存 -Xss)如果任务提交速度持续大余任务处理速度,大量线程阻塞在队列中,可能在拒绝策略前OOM;
LinkedBlockingQueue 中的putLock和takeLock皆为非公平锁,因此FixedThreadPool的任务执行是无序的;
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
特殊的FixedThreadPool,线程池大小固定1,单线程执行
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,执行线程数无限制(实际数量由虚拟机内存限制);
keepAliveTime = 60s,线程空闲60s后回收。
workQueue = SynchronousQueue,同步队列,此队列入队必须出队必须同时传递,队列内部不会存储元素,因此CachedThreadPool线程实际上不会有队列等待;
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }
DelayedWorkQueue保证添加到队列中的任务,会按照任务的延时时间进行排序,延时时间少的任务首先被获取
public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
newWorkStealingPool是一个并行的线程池,参数中传入的是一个线程并发的数量,和之前4种线程池不同,这个线程池不会保证任务的顺序执行,也就是 WorkStealing(工作窃取)的意思,抢占式的工作会创建一个含有足够多线程的线程池,来维持相应的并行级别,它会通过工作窃取的方式使多核的CPU不会闲置,总会有活着的线程让CPU运行