在上一篇文章 Java并发 之 线程池系列 (1) 让多线程不再坑爹的线程池 中,我们介绍了使用JDK concurrent包下的工厂和工具类 Executors
来创建线程池常用的几种方法:
//创建固定线程数量的线程池 ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10); //创建一个线程池,该线程池会根据需要创建新的线程,但如果之前创建的线程可以使用,会重用之前创建的线程 ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); //创建一个只有一个线程的线程池 ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); 复制代码
诚然,这种创建线程池的方法非常简单和方便。但仔细阅读源码,却把我吓了一条: 这是要老子的命啊!
我们前面讲过,如果有新的请求过来,在线程池中会创建新的线程处理这些任务,一直创建到线程池的最大容量(Max Size)为止;超出线程池的最大容量的Tasks,会被放入阻塞队列(Blocking Queue)进行等待,知道有线程资源释放出来为止;要知道的是,阻塞队列也是有最大容量的,多余队列最大容量的请求不光没有获得执行的机会,连排队的资格都没有!
那这些连排队的资格都没有的Tasks怎么处理呢?不要急,后面在介绍 ThreadPoolExecutor
的拒绝处理策略(Handler Policies for Rejected Task)的时候会详细介绍。
说到这里你也许有写疑惑了,上面这些东西,我通常使用 Executors
的时候没有指定过啊。是的,因为 Executors
很“聪明”地帮我们做了这些事情。
我们看下 Executors
的 newFixedThreadPool
和 newSingleThreadExecutor
方法的源码:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor( nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } 复制代码
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } 复制代码
其实它们底层还是通过 ThreadPoolExecutor
来创建 ExecutorService
的,这里对妻子的参数先不作介绍,下面会详细讲,这里只说一下 new LinkedBlockingQueue<Runnable>()
这个参数。
LinkedBlockingQueue
就是当任务数大于线程池的线程数的时候的阻塞队列,这里使用的是无参构造,我们再看一下构造函数:
/** * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}. */ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } 复制代码
我们看到阻塞队列的默认大小竟然是 Integer.MAX_VALUE
!
如果不做控制,拼命地往阻塞队列里放Task,分分钟“Out of Memory”啊!
还有更绝的, newCachedThreadPool
方法:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 复制代码
最大线程数默认也是 Integer.MAX_VALUE
,也就是说,如果之前的任务没有执行完就有新的任务进来了,就会继续创建新的线程,指导创建到 Integer.MAX_VALUE
为止。
下面提供一个使用 newCachedThreadPool
创建大量线程处理Tasks,最终 OutOfMemoryError
的例子。
友情提醒:场面过于血腥,请勿在生产环境使用。
package net.ijiangtao.tech.concurrent.jsd.threadpool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadPoolExample2 { private static final ExecutorService executorService = Executors.newCachedThreadPool(); private static class Task implements Runnable { @Override public void run() { try { Thread.sleep(1000 * 600); } catch (InterruptedException e) { e.printStackTrace(); } } } private static void newCachedThreadPoolTesterBadly() { System.out.println("begin............"); for (int i = 0; i <= Integer.MAX_VALUE; i++) { executorService.execute(new Task()); } System.out.println("end."); } public static void main(String[] args) { newCachedThreadPoolTesterBadly(); } } 复制代码
当 main
方法启动以后,打开控制面板,看到CPU和内存几乎已经全部耗尽:
很快控制台就抛出了 java.lang.OutOfMemoryError
:
begin............ Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:717) at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1378) at net.ijiangtao.tech.concurrent.jsd.threadpool.ThreadPoolExample2.newCachedThreadPoolTesterBadly(ThreadPoolExample2.java:24) at net.ijiangtao.tech.concurrent.jsd.threadpool.ThreadPoolExample2.main(ThreadPoolExample2.java:30) 复制代码
下面我们在看Java开发手册这条规定,应该就明白作者的良苦用心了吧。
【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明: Executors返回的线程池对象的弊端如下: 1)FixedThreadPool和SingleThreadPool:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。 2)CachedThreadPool和ScheduledThreadPool:允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
解铃还须系铃人,其实避免这个 OutOfMemoryError
风险的钥匙就藏在 Executors
的源码里,那就是自己直接使用 ThreadPoolExecutor
。
构造一个 ThreadPoolExecutor
需要蛮多参数的。下面是 ThreadPoolExecutor
的构造函数。
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } 复制代码
下面就一一介绍一下这些参数的具体含义。
其实从源码中的JavaDoc已经可以很清晰地明白这些参数的含义了,下面照顾懒得看英文的同学,再解释一下:
线程池核心线程数。
默认情况下核心线程会一直存活,即使处于闲置状态也不会受存 keepAliveTime
限制,除非将 allowCoreThreadTimeOut
设置为 true
。
线程池所能容纳的最大线程数。超过 maximumPoolSize
的线程将被阻塞。
最大线程数 maximumPoolSize
不能小于 corePoolSize
非核心线程的闲置超时时间。
超过这个时间非核心线程就会被回收。
keepAliveTime
的时间单位,如TimeUnit.SECONDS。
当将 allowCoreThreadTimeOut
为true时,对corePoolSize生效。
线程池中的任务队列。
没有获得线程资源的任务将会被放入 workQueue
,等待线程资源被释放。如果放入 workQueue
的任务数大于 workQueue
的容量,将由 RejectedExecutionHandler
的拒绝策略进行处理。
常用的有三种队列: SynchronousQueue
, LinkedBlockingDeque
, ArrayBlockingQueue
。
提供创建新线程功能的线程工厂。
ThreadFactory
是一个接口,只有一个 newThread
方法:
Thread newThread(Runnable r); 复制代码
无法被线程池处理的任务的处理器。
一般是因为任务数超出了 workQueue
的容量。
总结一下,当一个任务通过 execute(Runnable)
方法添加到线程池时:
如果此时线程池中线程的数量小于 corePoolSize
,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的数量等于 corePoolSize
,但是缓冲队列 workQueue
未满,那么任务被放入缓冲队列。
如果此时线程池中的数量大于 corePoolSize
,缓冲队列 workQueue
满,并且线程池中的数量小于 maximumPoolSize
,建新的线程来处理被添加的任务。
如果此时线程池中的数量大于 corePoolSize
,缓冲队列 workQueue
满,并且线程池中的数量等于 maximumPoolSize
,那么通过 handler所指定的拒绝策略来处理此任务。
处理任务的优先级为:核心线程数(corePoolSize) > 任务队列容量(workQueue) > 最大线程数(maximumPoolSize);如果三者都满了,使用rejectedExecutionHandler处理被拒绝的任务。
下面就通过一个简单的例子,使用 ThreadPoolExecutor
构造的线程池执行任务。
package net.ijiangtao.tech.concurrent.jsd.threadpool; import java.time.LocalTime; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * @author ijiangtao.net */ public class ThreadPoolExample3 { private static final AtomicInteger threadNumber = new AtomicInteger(1); private static class Task implements Runnable { @Override public void run() { try { Thread.currentThread().sleep(2000); System.out.println(Thread.currentThread().getName() + "-" + LocalTime.now()); } catch (InterruptedException e) { e.printStackTrace(); } } } private static class MyThreadFactory implements ThreadFactory { private final String namePrefix; public MyThreadFactory(String namePrefix) { this.namePrefix = namePrefix; } public Thread newThread(Runnable runnable) { return new Thread(runnable, namePrefix + "-" + threadNumber.getAndIncrement()); } } private static final ExecutorService executorService = new ThreadPoolExecutor( 10, 20, 30, TimeUnit.SECONDS, new LinkedBlockingDeque<>(50), new MyThreadFactory("MyThreadFromPool"), new ThreadPoolExecutor.AbortPolicy()); public static void main(String[] args) { // creates five tasks Task r1 = new Task(); Task r2 = new Task(); Task r3 = new Task(); Task r4 = new Task(); Task r5 = new Task(); // submit方法有返回值 Future future = executorService.submit(r1); System.out.println("r1 isDone ? " + future.isDone()); // execute方法没有返回值 executorService.execute(r2); executorService.execute(r3); executorService.execute(r4); executorService.execute(r5); //关闭线程池 executorService.shutdown(); } } 复制代码
r1 isDone ? false MyThreadFromPool-2-21:04:03.215 MyThreadFromPool-5-21:04:03.215 MyThreadFromPool-4-21:04:03.215 MyThreadFromPool-3-21:04:03.215 MyThreadFromPool-1-21:04:03.215 复制代码
从结果看,从线程池取出了5个线程,并发执行了5个任务。
这一章我们介绍了一种更安全、更定制化的线程池构建方式: ThreadPoolExecutor
。相信你以后不敢轻易使用 Executors
来构造线程池了。
后面我们会介绍线程池的更多实现方式(例如使用Google核心库Guava),以及关于线程池的更多知识和实战。