介绍线程池之前先简要了解一下Executor,ExecutorService,Future,Callable,Executors是什么,和线程池又有什么关系
它是线程池顶级接口。它定义了一个方法 void execute(Runnable) 。
这个方法是用于处理任务的一个服务方法,调用者提供Runnable接口的实现,线程池通过线程执行这个Runnable,该服务方法是无返回值的
) ,所有的线程池类型都实现这个接口
顾名思义,Future->未来,代表线程任务执行结束后的结果。
获取线程执行结果的方式是通过get方法获取的,get有两种方式,有参和无参
无参 T get()
->阻塞等待线程执行结束,并得到结果。
T get(long, TimeUnit)
->阻塞固定时长,等待线程执行结束后的结果,如果在阻塞时长范围内,线程未执行结束,抛出异常。
Callable类似Runnable接口,它有一个call方法,它的作用和Runnable中的run方法完全一致,但也有区别 Callable的call->有返回值,可以抛出任意异常 Runnable的run-> 无返回值,不能抛出未检查的异常
call方法的返回值就是Future中get方法的返回值
Executors是一个工具类,类似Collection和Collections的关系,可以更简单的创建若干种线程池,通过Executors可以直接得到想要的线程池
线程池状态: Running, ShuttingDown, Termitnaed
FixedThreadPool是固定容量线程池,创建线程池的时候容量固定,使用的是BlockingQueue作为任务的载体,线程池默认的容量上限是Integer.MAX_VALUE
下面是一个无返回值的小案例:
案例中创建了一个线程池,容量为5,执行6个任务,分析调用shutdown方法后,分析任务的执行情况
/** * 线程池 * 固定容量线程池 */ package com.bernardlowe.concurrent.t08; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class Test_02_FixedThreadPool { public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(5); for(int i = 0; i < 6; i++){ service.execute(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " - test executor"); } }); } System.out.println("初始状态:" + service); System.out.println("开始调用shutdown方法====="); service.shutdown(); // 是否已经结束, 相当于回收了资源。 System.out.println("是否terminated:" + service.isTerminated()); // 是否已经关闭, 是否调用过shutdown方法 System.out.println("是否shutdown:" + service.isShutdown()); System.out.println("shutdown后的状态:" + service); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } // service.shutdown(); System.out.println("2秒过后任务全部执行完===="); System.out.println("是否terminated:" + service.isTerminated()); System.out.println("是否shutdown:" + service.isShutdown()); System.out.println("任务全部执行完过后状态:" + service); } } 复制代码
结果:
从图中可以分析出以下几个过程
在初始状态:五个执行线程,1个任务在等待队列,0个完成任务
↓
调用shutdown方法后:线程池未关闭(terminated为false),调用了shutdown(不再接收新任务),0个完成任务
↓
两秒后任务执行完毕:线程池已关闭(terminated为true),调用了shutdown(不再接收新任务),6个完成任务
下面是一个有返回值的小案例:案例中创建了一个线程池,容量为1,submit方法传了一个Callable,future通过get获取线程的返回值
/** * 线程池 * 固定容量线程池(有返回值) */ package com.bernardlowe.concurrent.t08; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; public class Test_03_Future { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService service = Executors.newFixedThreadPool(1); Future<String> future = service.submit(new Callable<String>() { @Override public String call() { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return Thread.currentThread().getName() + " - test executor"; } }); System.out.println("线程是否结束: " + future.isDone()); // 查看线程是否结束, 任务是否完成。 call方法是否执行结束 System.out.println("call方法的返回值: " + future.get()); // 获取call方法的返回值。 System.out.println("线程是否结束: " + future.isDone()); // 关闭线程池 service.shutdown(); } } 复制代码
结果:
缓存的线程池, 容量不限(Integer.MAX_VALUE),自动扩容
容量管理策略:如果线程池中的线程数量不满足任务执行,创建新的线程。每次有新任务无法即时处理的时候,都会创建新的线程。当线程池中的线程空闲时长达到一定的临界值(默认60秒),自动释放线程,这里通过Executors.newCachedThreadPool()方法得到的线程池无法修改空闲时间,具体原因见下图,但可以通过自定义线程池ThreadPoolExecutor修改,具体方法见2.5,这里就不解释了
应用场景: 内部应用或测试应用。
案例演示:
/** * 线程池 * 无容量限制的线程池(最大容量默认为Integer.MAX_VALUE) */ package com.bernardlowe.concurrent.t08; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class Test_05_CachedThreadPool { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); System.out.println(service); for(int i = 0; i < 5; i++){ service.execute(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " - test executor"); } }); } System.out.println(service); try { TimeUnit.SECONDS.sleep(65); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(service); } } 复制代码
ScheduledThreadPool是计划任务线程池,可以根据计划自动执行任务的线程池,底层实现是一个DelayedWorkQueue,它的一个主要方法scheduleAtFixedRate
有以下几个参数:
案例:
/** * 线程池 * 计划任务线程池。 */ package com.bernardlowe.concurrent.t08; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class Test_07_ScheduledThreadPool { public static void main(String[] args) { ScheduledExecutorService service = Executors.newScheduledThreadPool(3); System.out.println(service); // 定时完成任务。 scheduleAtFixedRate(Runnable, start_limit, limit, timeunit) // runnable - 要执行的任务。 service.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); } }, 0, 300, TimeUnit.MILLISECONDS); } } 复制代码
单一容量的线程池,用法和FixedThreadPool类似,但和newFixedThreadPool不一样的是newSingleThreadExecutor创建的线程池又被一个FinalizableDelegatedExecutorService包装了一下
总结一下SingleThreadExecutor:
https://www.jianshu.com/p/2b7d853322bb
分支合并线程池(mapduce类似的设计思想),可以递归完成复杂任务,适合用于处理复杂任务 要求可分支合并的任务必须是ForkJoinTask类型的子类型 ForkJoinTask类型提供了两个抽象子类型:
RecursiveTask有返回结果的分支合并任务
RecursiveAction无返回结果的分支合并任务
案例: 这个案例做了一个以ForkJoinPool实现的数据累加,当计算数字区间大于MAX_SIZE=50000时,开启新的线程任务的计算,最后合并统计结果
/** * 线程池 * 分支合并线程池。 */ package com.bernardlowe.concurrent.t08; import java.io.IOException; import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; public class Test_08_ForkJoinPool { final static int[] numbers = new int[1000000]; final static int MAX_SIZE = 500000; final static Random r = new Random(); static{ for(int i = 0; i < numbers.length; i++){ numbers[i] = r.nextInt(1000); } } static class AddTask extends RecursiveTask<Long>{ // RecursiveAction int begin, end; public AddTask(int begin, int end){ this.begin = begin; this.end = end; } // protected Long compute(){ if((end - begin) < MAX_SIZE){ long sum = 0L; for(int i = begin; i < end; i++){ sum += numbers[i]; } // System.out.println("form " + begin + " to " + end + " sum is : " + sum); return sum; }else{ int middle = begin + (end - begin)/2; AddTask task1 = new AddTask(begin, middle); AddTask task2 = new AddTask(middle, end); task1.fork();// 就是用于开启新的任务的。 就是分支工作的。 就是开启一个新的线程任务。 task2.fork(); // join - 合并。将任务的结果获取。 这是一个阻塞方法。一定会得到结果数据。 return task1.join() + task2.join(); } } } public static void main(String[] args) throws InterruptedException, ExecutionException, IOException { long result = 0L; for(int i = 0; i < numbers.length; i++){ result += numbers[i]; } System.out.println(result); ForkJoinPool pool = new ForkJoinPool(); AddTask task = new AddTask(0, numbers.length); Future<Long> future = pool.submit(task); System.out.println(future.get()); } } 复制代码
结果:该任务分类四个线程任务进行计算,最后汇总
ThreadPoolExecutor线程池的底层实现,除ForkJoinPool外,其他常用线程池底层都是使用ThreadPoolExecutor实现的,其中有一个构造方法如下:
案例:
/** * 线程池 * 固定容量线程池 */ package com.bernardlowe.concurrent.t08; import java.util.ArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Test_09_ThreadPoolExecutor { public static void main(String[] args) { // 模拟fixedThreadPool, 核心线程5个,最大容量5个,线程的生命周期无限。 ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); for(int i = 0; i < 6; i++){ service.execute(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " - test executor"); } }); } System.out.println(service); service.shutdown(); System.out.println(service.isTerminated()); System.out.println(service.isShutdown()); System.out.println(service); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } service.shutdown(); System.out.println(service.isTerminated()); System.out.println(service.isShutdown()); System.out.println(service); } } 复制代码