转载

[Java并发-3]线程池应用以及原理剖析

线程池应用以及原理剖析

1.线程池的意义

我们为什么要用线程池?讨论这个问题之前,应该先说明为什么使用多线程。使用多线程,本质上就是提高程序性能。我们经常说,多线程能让程序更快,快在哪里?所以应该说明一下,如何衡量性能的高低。

性能的核心指标有2个,吞吐量和延迟。吞吐量是指,单位时间内能处理的请求的数量,吞吐量越大,意味着能处理的请求越多。延迟是指,从发出请求到收到响应的间隔时间,延迟越小,说明速度越快。这2个指标内部有一定的联系,同等条件下,延迟越小,吞吐量越大。但是不能相互转换,因为是不同维度的概念,延迟是时间维度,吞吐量是空间维度。

那我们想要提升程序的性能,就得从这2个方面入手。降低延迟,提高吞吐量。基本上有2种手段,一是发挥算法的优势,二是发挥硬件的优势。后者与并发编程息息相关。那从硬件的角度来说,提高硬件的效率,无非是提高CPU的使用率和IO设备的使用率。如果只有1个线程工作,那么这个线程使用CPU的时候,IO设备肯定是空闲的,使用IO设备的时候,CPU肯定是空闲的。这就存在了资源的浪费。假设一个接口,CPU处理时间100毫秒,IO时间100毫秒,那么单线程环境下,1秒中能响应5个请求。如果有2个线程,一个线程在使用CPU的时候,另一个使用IO,这种情况下,CPU和IO的利用率就是100%,200毫秒能处理2个请求(忽略线程切换成本),1秒中处理10个请求,吞吐量多了一倍。

好了,我们梳理完为什么使用多线程,再来看看为什么用线程池。首先,线程不能无节制的创建,线程在操作系统中是一种资源,创建销毁都需要时间空间。其次,根据JVM规范,一个线程默认的最大栈空间是1M,这个空间是从系统内存中分配的,线程太多会占用大量的内存。再次,线程过多,会频繁的切换上下文,这是多余的损耗。还有,如果我们创建销毁一个线程总共需要10ms,而任务只需要执行5ms,那么按照传统的 创建线程->执行任务->销毁线程 这个流程,实际干活仅花了小部分时间,不如不销毁线程,因此出现了线程池:创建一定数量的线程执行任务,任务来的时候,直接从线程池取线程,任务结束了就把线程归还到线程池中。这样大大增加了线程的使用效率,不用耗费资源在创建销毁线程上。

2.线程池的组成

  • 线程池管理器:创建并管理线程池。例如创建线程池、销毁线程池、添加新任务等等。
  • 工作线程:线程池中的工作线程。空闲时处于空闲状态,可以循环的执行任务。
  • 任务接口:每个任务必须实现任务接口,以供工作线程调度任务的执行。
  • 任务队列:存放待处理任务的队列。

3.Java线程池API简介

最基础的接口和实现类

  • 接口:Executor,最上层的接口,只定义了执行任务的方法 execute(Runnable command)
  • 接口:ExecutorService,继承Executor接口,拓展了一些方法。
  • 接口:ScheduledExecutorService,继承ExecutorService接口,拓展了定时任务相关的一些方法。
  • 类:ThreadPoolExecutor,实现了ExecutorService接口,是最基础最标准的线程池实现类。
  • 类:ScheduledThreadPoolExecutor,继承ThreadPoolExecutor,实现了ScheduledExecutorService接口,拥有定时任务相关功能。

下面介绍一下ExecutorService接口中定义的方法。

void shutdown();//优雅的关闭线程池,之前提交的任务将会执行,不接收新的submit。等到线程池中的任务执行完毕后,才退出。
List<Runnable> shutdownNow();//关闭线程池,尝试interrupt线程池中正在执行的工作线程,但是不保证interrupt成功,因为如果线程中没有中断处理的逻辑,interrupt()方法是无法中断线程的。取消任务队列里的任务,并返回这些任务。 
boolean isShutdown();//如果调用了shutdown或者shutdownNow,返回true。
boolean isTerminated();//如果shutdown且所有任务都完成了,返回true,如果shutdownNow且成功退出后,返回true。
boolean awaitTermination(long timeout, TimeUnit unit);//阻塞当前线程,直到所有任务(正在执行的任务和任务队列里的任务)执行结束,或超时,或当前线程被中断(抛异常),才返回true。实际作用是监控当前线程池是否已经关闭。
<T> Future<T> submit(Callable<T> task);//提交一个执行任务的Callable,返回一个Future对象,用于获取Callable执行结果。
<T> Future<T> submit(Runnable task, T result);//提交一个执行任务的Runnable,返回一个Future对象,执行结果为传入的result对象。这个方法感觉不好用。
Future<?> submit(Runnable task);//提交一个执行任务的Runnable,返回一个Future对象,执行结果为null。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)//执行Callable任务集合,执行完毕后,返回Future对象集合。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)//执行Callable任务集合,执行完毕或者超时以后,返回Future对象集合,其他任务终止。
<T> T invokeAny(Collection<? extends Callable<T>> tasks)//执行Callable任务集合,任意一个任务执行成功,返回结果。
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)//执行Callable任务集合,任意一个任务执行成功或者超时,返回结果。

下面介绍一下ScheduledExecutorService接口中定义的方法。

ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);//提交一个延时Runnable任务,只会执行一次。
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);//提交一个延时Callable任务,只会执行一次。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);//提交一个周期性任务,该任务在initialDelay时长后第一次执行任务,任务间隔时长为period
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);//提交一个周期性任务,该任务在initialDelay时长后第一次执行任务,任务间隔时长为delay

其中,scheduleAtFixedRate和scheduleWithFixedDelay的区别在于,后者定时任务的间隔时间,是从任务执行结束后,开始累加的,前者不是。

scheduleAtFixedRate的时间图:

[Java并发-3]线程池应用以及原理剖析

scheduleWithFixedDelay的时间图:

[Java并发-3]线程池应用以及原理剖析

4.线程池的使用示例

在这一小节里,将会有几个线程池参数相关的使用示例,帮助我们更深刻的理解线程池的属性。

  • 初始化线程池,核心线程数量5个,最大线程数量10个,超出核心线程数的线程存活时间为5秒,任务队列使用无界队列,拒绝策略采取默认的。
public void test1() throws InterruptedException{
        //初始化线程池:核心线程5个;最大数量10个;超出核心线程数量的线程存活时间:5秒;无界阻塞队列;默认拒绝策略
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 
                10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        testCommon(executor);
    }
    
    /**公共的测试方法,传入一个线程池
     * @throws InterruptedException */
    public void testCommon(ThreadPoolExecutor threadPoolExecutor) throws InterruptedException{
        //提交15个任务,每个任务执行时间3秒
        for(int i = 1;i <= 15; i++){
            int n = i;
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(3000);
                        System.err.println("任务执行完毕-" + n);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            threadPoolExecutor.submit(task);//提交任务
            System.out.println("任务" + n + "已提交(可能被拒绝,过会打印执行完毕就是没被拒绝)");
        }
        //等待0.5秒
        Thread.sleep(500);
        //提交完15个任务以后,查看队列的相关信息
        System.out.println("提交500毫秒后线程池中的工作线程的数量" + threadPoolExecutor.getPoolSize());
        System.out.println("提交500毫秒后线程池中的任务队列的任务数量" + threadPoolExecutor.getQueue().size());
        
        Thread.sleep(15000);
        //等待15.5秒
        System.out.println("提交15秒后线程池的数量" + threadPoolExecutor.getPoolSize());
        System.out.println("提交15秒后线程池等待线程的数量" + threadPoolExecutor.getQueue().size());
    }

在一次性提交完15个任务过后500ms,线程池中任务队列的数量为10个,工作线程的数量是5个。这是为什么呢?原理如下:在刚创建线程池的时候,此时池中的工作线程个数为核心线程个数。如果来了一个任务,而此时有空闲的线程,那么直接交给空闲线程处理。当所有的核心线程都在忙碌,并在此时又来了一个新任务,那么线程池会把任务交给任务队列,只要任务队列没满,新任务就一直放到队列里。在这个例子中,我们使用的是无界队列,上限无限制的,所以就有了开头的现象,任务队列数量为10个。那有人可能问,最大线程数是干嘛的?这里说明一下,当核心线程都在忙碌,并且任务队列满了时,这时如果提交新任务,线程池就会判断,工作线程的数量是否达到了最大线程数量,如果没达到,就会创建线程,执行新任务,如果达到了最大线程数量,就会执行拒绝策略。

  • 初始化线程池,核心线程数量5个,最大线程数量10个,超出核心线程数的线程存活时间为5秒,任务队列使用有界队列(长度为3),拒绝策略就是打印信息,不执行任务。
public void test2() throws InterruptedException{
        //初始化线程池:核心线程5个;最大数量10个;超出核心线程数量的线程存活时间:5秒;有界队列
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS
                ,new ArrayBlockingQueue<Runnable>(3), new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println("线程池满了,执行拒绝策略,不添加任务");
                    }
                });
        
        testCommon(executor);
    }

这个线程池的执行结果是,前5个任务,直接由核心线程执行,第6-8个任务,被线程池添加到任务队列里,第9-13个任务,线程池会创建新的线程处理,第14、15个任务,会拒绝处理。并且由于每个任务实际上是阻塞3秒,等同于1-5、9-13这10个任务所在的线程,会近乎于同时阻塞3秒,等这10个线程中的某个线程执行任务完毕后,线程池会将队列中的任务(第6-8个任务),交给这个空闲线程处理。

  • 初始化线程池,核心线程数量0个,最大线程数量Integer.MAX_VALUE个,超出核心线程数的线程存活时间为60秒,任务队列使用同步队列,默认拒绝策略。
public void test3() throws InterruptedException{
        //核心数量为0,说明线程池一开始没有任何线程。提交的15个任务,都会直接进入队列中等待。
        //但是,SynchronousQueue这种队列,实际上不是一个真正的队列,它没有为元素维护存储空间。线程池将任务放到队列的操作就会失败。
        //因此,线程池就会创建一个临时线程,来处理这个任务。(因为临时线程的上限是Integer.MAX_VALUE)
        //这种线程池的作用:如果来了一个任务,有空闲线程,就用空闲线程,没有空闲线程,就开一个新线程处理。实际环境不能用Integer.MAX_VALUE。
        ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60,
                TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        testCommon(executor);
    }
  • 初始化定时任务类型的线程池,核心线程数为5,提交任务2秒后执行。提交100个任务。
public void test4() throws InterruptedException{
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(5);
        for(int i = 1; i <= 100; i++){
            executor.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println("执行任务:" + Thread.currentThread().getName());
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, 2, TimeUnit.SECONDS);
        }
    }

这个线程池的执行结果是,每隔1秒钟,控制台打印5行"执行任务:线程名"。为什么是隔1秒,不是隔2秒?因为这100个任务,实际是阻塞线程1秒钟。当100个任务提交以后,过了2秒后,线程池中的5个线程,开始执行5个任务,5个线程执行任务,看上去总共只需要花1秒(因为任务实际就是阻塞1秒,阻塞完就完了),等到1秒过后,只要有线程的任务结束了,就再取新的任务执行。因此是间隔1秒。

原文  https://segmentfault.com/a/1190000020252437
正文到此结束
Loading...