我们为什么要用线程池?讨论这个问题之前,应该先说明为什么使用多线程。使用多线程,本质上就是提高程序性能。我们经常说,多线程能让程序更快,快在哪里?所以应该说明一下,如何衡量性能的高低。
性能的核心指标有2个,吞吐量和延迟。吞吐量是指,单位时间内能处理的请求的数量,吞吐量越大,意味着能处理的请求越多。延迟是指,从发出请求到收到响应的间隔时间,延迟越小,说明速度越快。这2个指标内部有一定的联系,同等条件下,延迟越小,吞吐量越大。但是不能相互转换,因为是不同维度的概念,延迟是时间维度,吞吐量是空间维度。
那我们想要提升程序的性能,就得从这2个方面入手。降低延迟,提高吞吐量。基本上有2种手段,一是发挥算法的优势,二是发挥硬件的优势。后者与并发编程息息相关。那从硬件的角度来说,提高硬件的效率,无非是提高CPU的使用率和IO设备的使用率。如果只有1个线程工作,那么这个线程使用CPU的时候,IO设备肯定是空闲的,使用IO设备的时候,CPU肯定是空闲的。这就存在了资源的浪费。假设一个接口,CPU处理时间100毫秒,IO时间100毫秒,那么单线程环境下,1秒中能响应5个请求。如果有2个线程,一个线程在使用CPU的时候,另一个使用IO,这种情况下,CPU和IO的利用率就是100%,200毫秒能处理2个请求(忽略线程切换成本),1秒中处理10个请求,吞吐量多了一倍。
好了,我们梳理完为什么使用多线程,再来看看为什么用线程池。首先,线程不能无节制的创建,线程在操作系统中是一种资源,创建销毁都需要时间空间。其次,根据JVM规范,一个线程默认的最大栈空间是1M,这个空间是从系统内存中分配的,线程太多会占用大量的内存。再次,线程过多,会频繁的切换上下文,这是多余的损耗。还有,如果我们创建销毁一个线程总共需要10ms,而任务只需要执行5ms,那么按照传统的 创建线程->执行任务->销毁线程 这个流程,实际干活仅花了小部分时间,不如不销毁线程,因此出现了线程池:创建一定数量的线程执行任务,任务来的时候,直接从线程池取线程,任务结束了就把线程归还到线程池中。这样大大增加了线程的使用效率,不用耗费资源在创建销毁线程上。
最基础的接口和实现类
下面介绍一下ExecutorService接口中定义的方法。
void shutdown();//优雅的关闭线程池,之前提交的任务将会执行,不接收新的submit。等到线程池中的任务执行完毕后,才退出。 List<Runnable> shutdownNow();//关闭线程池,尝试interrupt线程池中正在执行的工作线程,但是不保证interrupt成功,因为如果线程中没有中断处理的逻辑,interrupt()方法是无法中断线程的。取消任务队列里的任务,并返回这些任务。 boolean isShutdown();//如果调用了shutdown或者shutdownNow,返回true。 boolean isTerminated();//如果shutdown且所有任务都完成了,返回true,如果shutdownNow且成功退出后,返回true。 boolean awaitTermination(long timeout, TimeUnit unit);//阻塞当前线程,直到所有任务(正在执行的任务和任务队列里的任务)执行结束,或超时,或当前线程被中断(抛异常),才返回true。实际作用是监控当前线程池是否已经关闭。 <T> Future<T> submit(Callable<T> task);//提交一个执行任务的Callable,返回一个Future对象,用于获取Callable执行结果。 <T> Future<T> submit(Runnable task, T result);//提交一个执行任务的Runnable,返回一个Future对象,执行结果为传入的result对象。这个方法感觉不好用。 Future<?> submit(Runnable task);//提交一个执行任务的Runnable,返回一个Future对象,执行结果为null。 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)//执行Callable任务集合,执行完毕后,返回Future对象集合。 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)//执行Callable任务集合,执行完毕或者超时以后,返回Future对象集合,其他任务终止。 <T> T invokeAny(Collection<? extends Callable<T>> tasks)//执行Callable任务集合,任意一个任务执行成功,返回结果。 <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)//执行Callable任务集合,任意一个任务执行成功或者超时,返回结果。
下面介绍一下ScheduledExecutorService接口中定义的方法。
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);//提交一个延时Runnable任务,只会执行一次。 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);//提交一个延时Callable任务,只会执行一次。 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);//提交一个周期性任务,该任务在initialDelay时长后第一次执行任务,任务间隔时长为period public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);//提交一个周期性任务,该任务在initialDelay时长后第一次执行任务,任务间隔时长为delay
其中,scheduleAtFixedRate和scheduleWithFixedDelay的区别在于,后者定时任务的间隔时间,是从任务执行结束后,开始累加的,前者不是。
scheduleAtFixedRate的时间图:
scheduleWithFixedDelay的时间图:
在这一小节里,将会有几个线程池参数相关的使用示例,帮助我们更深刻的理解线程池的属性。
public void test1() throws InterruptedException{ //初始化线程池:核心线程5个;最大数量10个;超出核心线程数量的线程存活时间:5秒;无界阻塞队列;默认拒绝策略 ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); testCommon(executor); } /**公共的测试方法,传入一个线程池 * @throws InterruptedException */ public void testCommon(ThreadPoolExecutor threadPoolExecutor) throws InterruptedException{ //提交15个任务,每个任务执行时间3秒 for(int i = 1;i <= 15; i++){ int n = i; Runnable task = new Runnable() { @Override public void run() { try { Thread.sleep(3000); System.err.println("任务执行完毕-" + n); } catch (InterruptedException e) { e.printStackTrace(); } } }; threadPoolExecutor.submit(task);//提交任务 System.out.println("任务" + n + "已提交(可能被拒绝,过会打印执行完毕就是没被拒绝)"); } //等待0.5秒 Thread.sleep(500); //提交完15个任务以后,查看队列的相关信息 System.out.println("提交500毫秒后线程池中的工作线程的数量" + threadPoolExecutor.getPoolSize()); System.out.println("提交500毫秒后线程池中的任务队列的任务数量" + threadPoolExecutor.getQueue().size()); Thread.sleep(15000); //等待15.5秒 System.out.println("提交15秒后线程池的数量" + threadPoolExecutor.getPoolSize()); System.out.println("提交15秒后线程池等待线程的数量" + threadPoolExecutor.getQueue().size()); }
在一次性提交完15个任务过后500ms,线程池中任务队列的数量为10个,工作线程的数量是5个。这是为什么呢?原理如下:在刚创建线程池的时候,此时池中的工作线程个数为核心线程个数。如果来了一个任务,而此时有空闲的线程,那么直接交给空闲线程处理。当所有的核心线程都在忙碌,并在此时又来了一个新任务,那么线程池会把任务交给任务队列,只要任务队列没满,新任务就一直放到队列里。在这个例子中,我们使用的是无界队列,上限无限制的,所以就有了开头的现象,任务队列数量为10个。那有人可能问,最大线程数是干嘛的?这里说明一下,当核心线程都在忙碌,并且任务队列满了时,这时如果提交新任务,线程池就会判断,工作线程的数量是否达到了最大线程数量,如果没达到,就会创建线程,执行新任务,如果达到了最大线程数量,就会执行拒绝策略。
public void test2() throws InterruptedException{ //初始化线程池:核心线程5个;最大数量10个;超出核心线程数量的线程存活时间:5秒;有界队列 ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS ,new ArrayBlockingQueue<Runnable>(3), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("线程池满了,执行拒绝策略,不添加任务"); } }); testCommon(executor); }
这个线程池的执行结果是,前5个任务,直接由核心线程执行,第6-8个任务,被线程池添加到任务队列里,第9-13个任务,线程池会创建新的线程处理,第14、15个任务,会拒绝处理。并且由于每个任务实际上是阻塞3秒,等同于1-5、9-13这10个任务所在的线程,会近乎于同时阻塞3秒,等这10个线程中的某个线程执行任务完毕后,线程池会将队列中的任务(第6-8个任务),交给这个空闲线程处理。
public void test3() throws InterruptedException{ //核心数量为0,说明线程池一开始没有任何线程。提交的15个任务,都会直接进入队列中等待。 //但是,SynchronousQueue这种队列,实际上不是一个真正的队列,它没有为元素维护存储空间。线程池将任务放到队列的操作就会失败。 //因此,线程池就会创建一个临时线程,来处理这个任务。(因为临时线程的上限是Integer.MAX_VALUE) //这种线程池的作用:如果来了一个任务,有空闲线程,就用空闲线程,没有空闲线程,就开一个新线程处理。实际环境不能用Integer.MAX_VALUE。 ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); testCommon(executor); }
public void test4() throws InterruptedException{ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(5); for(int i = 1; i <= 100; i++){ executor.schedule(new Runnable() { @Override public void run() { System.out.println("执行任务:" + Thread.currentThread().getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }, 2, TimeUnit.SECONDS); } }
这个线程池的执行结果是,每隔1秒钟,控制台打印5行"执行任务:线程名"。为什么是隔1秒,不是隔2秒?因为这100个任务,实际是阻塞线程1秒钟。当100个任务提交以后,过了2秒后,线程池中的5个线程,开始执行5个任务,5个线程执行任务,看上去总共只需要花1秒(因为任务实际就是阻塞1秒,阻塞完就完了),等到1秒过后,只要有线程的任务结束了,就再取新的任务执行。因此是间隔1秒。