java提供了快速创建线程池的方法,即使用java.util.concurrent包提供的Executors类进行线程池的创建,Executors提供了多种常见线程池的创建方式,其中:
//创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。 public static ExecutorService newFixedThreadPool(int nThreads); //创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。 public static ExecutorService newSingleThreadExecutor() //创建一个定长线程池,支持定时及周期性任务执行。 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) //创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 public static ExecutorService newCachedThreadPool() 复制代码
以上四种线程池几乎涵盖了我们日常普通编码时遇到的大多数场景,其用法大同小异, 本篇将着重使用newSingleThreadExecutor及newFixedThreadPool两种场景,配合Runnable/Callable/Future进行线程池经典用法的讲解 。
我们先来看一段代码:
public int doSomething() { try { System.out.println("do something."); Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return (new Random(1000)).nextInt(); } 复制代码
简单来看,我们需要用100ms的时间去模拟一段任务,然后返回一个随机数。以此功能为基础,我们知道引入多线程可以完成任务并发,减少任务耗时,提高系统处理任务的能力。
那么现在思考几个问题:
对以上需求修改:
不说思路,直接看代码:
public class ThreadPool { public static ExecutorService es = Executors.newFixedThreadPool(20); public static void main(String[] args) throws InterruptedException { fixedThreadPool(5); Thread.sleep(2000); //模拟请求发起时间的不确定性。 fixedThreadPool(50); fixedThreadPool(4); fixedThreadPool(6); System.out.println("submit task all"); return; } private static void fixedThreadPool(int count) { long startTime = System.currentTimeMillis(); Executors.newSingleThreadExecutor().execute(new Runnable() { @Override public void run() { Map<Integer, Future<Integer>> futures = new HashMap<>(); //加入5个Callable任务,直到返回结果 for (int i = 1; i <= count; i++) { final long time = System.currentTimeMillis(); final int task = i; Future<Integer> future = es.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("TaskFlag: " + count + ", Install MCS plugins " + task + " is working."); return (new Random(1000)).nextInt(); } }); //future.get(); //次步骤将阻塞线程的提交流程,但并不会阻塞线程池中的任务。 futures.put(task, future); } for (int i = 1; i <= count; i++) { try { System.out.println("taskID: " + i + ", Res: " + futures.get(i).get()); } catch (Exception e) { e.printStackTrace(); } } System.out.println(count + " 个 安装MCS 插件任务submit完成!Time:" + (System.currentTimeMillis() - startTime)); //虽然shutdown方法是等所有任务跑完后才真正停掉线程池,但该方法不会造成堵塞,也就是这代码运行后,下一行代码会立刻运行 //es.shutdown(); System.out.println("Executors.newSingleThreadExecutor().execute : shutdown后退出! Time:" + (System.currentTimeMillis() - startTime)); } }); } } 复制代码
1、
public static ExecutorService es = Executors.newFixedThreadPool(20); 复制代码
根据系统实现能力创建的共享线程池,数值可以根据系统承载能力进行计算,如CPU个数,这里不进行扩展。
2、
Executors.newSingleThreadExecutor().execute(new Runnable() { @Override public void run() { //do somethings } }); 复制代码
在每个请求中,通过单线程线程池直接执行一个匿名类的 run
方法,执行操作。 Java java.util.concurrent
包中提供的单线程线程池将返回一个 ExecutorService
对象,我们再来看 ExecutorService
的定义:
ExecutorService
实现了
Executor
接口:
因此可以通过 Executors.newSingleThreadExecutor()
创建的单线程线程池的 execute
方法执行一个线程。
execute
方法接受一个实现了 Runnable
接口的类的对象,可以是实际的类或匿名类,本例中使用了匿名类的方式;
本例中通过单线程池方式实现请求的异步处理。
3、
Future<Integer> future = es.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("TaskFlag: " + count + ", Install MCS plugins " + task + " is working."); return (new Random(1000)).nextInt(); } }); futures.put(task, future); 复制代码
先看看本例中使用的线程池: es
我们回到java源码:
同理我们发现 newFixedThreadPool
同样返回一个 ExecutorService
对象,java源码中,此方法返回一个 LinkedBlockingQueue<Runnable>
对象。但本篇中不详细讲解,有兴趣可进一步读java源码; 创建定长线程池时在本例中,使用 submit
的方式来执行一个任务:
Future<Integer> future = es.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { //do something return (new Random(1000)).nextInt(); } }); 复制代码
Future
对象进行返回结果查询 execute
的方式执行,定长线程池使用 submit
的方式执行; Runnable
接口的类,定长线程池使用实现 Callable
接口的类; Future
对象进行返回结果查询 现在先了解一下 Future
类: 我们先看看 submit
方法的定义: 我对此注释进行了简要翻译:
提交一个有返回值的任务,方法将返回一个Future类型的结果,使用Future提供的get函数将在线程完全执行成功后,得到线程的执行结果。 注释中对Future类型做了简要解释:表示一个挂起的任务的执行结果。 复制代码
在对 Future
类型的定义进行了查找:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; } 复制代码
官方的注释进行比较详细的说明( 这里是重点 )
我认为不需要过多解释我们就可以明确为什么会使用 Future
的原因: 我们使用 Future
作为任务执行结果的一个获取,而此种获取方式是异步的,因此我们通过 submit
方式提交一个任务时,将返回一个 Future
对象作为当前提交线程的 控制对象
execute
的方式执行,定长线程池使用 submit
的方式执行? 通过对第一个问题的简单分析,我们可以明确到,本例中,需要对执行结果进行处理(虽然在本例中仅仅做了打印),那么我们所执行的线程对象必然要有返回值,那么 Executor
接口提供的 execute
没有返回值;
Runnable
接口的类,定长线程池使用实现 Callable
接口的类; 分析完以上两个问题,那么第三个问题就迎刃而解了 Runnable
不支持线程返回结果,而 Callable
接口支持; Runnable
与 Callable
的核心差别: Runnable
不支持线程返回结果,而 Callable
接口支持; submit
与 execute
作用相同,都用于执行线程池中的线程,但 execute
无返回值,但 submit
可返回线程的执行结果, submit
也可以提交 Runnable
接口实现的线程对象 TaskFlag: 5, Install MCS plugins 3 is working. TaskFlag: 5, Install MCS plugins 2 is working. TaskFlag: 5, Install MCS plugins 1 is working. TaskFlag: 5, Install MCS plugins 5 is working. taskID: 1, Res: -1244746321 taskID: 2, Res: -1244746321 taskID: 3, Res: -1244746321 TaskFlag: 5, Install MCS plugins 4 is working. taskID: 4, Res: -1244746321 taskID: 5, Res: -1244746321 5 个 安装MCS 插件任务submit完成!Time:114 Executors.newSingleThreadExecutor().execute : shutdown后退出! Time:114 submit task all TaskFlag: 50, Install MCS plugins 4 is working. TaskFlag: 50, Install MCS plugins 2 is working. TaskFlag: 6, Install MCS plugins 1 is working. TaskFlag: 50, Install MCS plugins 3 is working. TaskFlag: 6, Install MCS plugins 2 is working. taskID: 1, Res: -1244746321 taskID: 2, Res: -1244746321 TaskFlag: 6, Install MCS plugins 5 is working. TaskFlag: 6, Install MCS plugins 3 is working. taskID: 3, Res: -1244746321 TaskFlag: 6, Install MCS plugins 6 is working. TaskFlag: 4, Install MCS plugins 3 is working. TaskFlag: 50, Install MCS plugins 8 is working. TaskFlag: 4, Install MCS plugins 2 is working. TaskFlag: 50, Install MCS plugins 7 is working. TaskFlag: 4, Install MCS plugins 4 is working. TaskFlag: 4, Install MCS plugins 1 is working. TaskFlag: 50, Install MCS plugins 9 is working. taskID: 1, Res: -1244746321 TaskFlag: 50, Install MCS plugins 10 is working. taskID: 2, Res: -1244746321 taskID: 3, Res: -1244746321 taskID: 4, Res: -1244746321 4 个 安装MCS 插件任务submit完成!Time:104 Executors.newSingleThreadExecutor().execute : shutdown后退出! Time:104 TaskFlag: 50, Install MCS plugins 6 is working. TaskFlag: 50, Install MCS plugins 5 is working. TaskFlag: 6, Install MCS plugins 4 is working. taskID: 4, Res: -1244746321 taskID: 5, Res: -1244746321 taskID: 6, Res: -1244746321 6 个 安装MCS 插件任务submit完成!Time:105 Executors.newSingleThreadExecutor().execute : shutdown后退出! Time:105 TaskFlag: 50, Install MCS plugins 1 is working. taskID: 1, Res: -1244746321 taskID: 2, Res: -1244746321 taskID: 3, Res: -1244746321 taskID: 4, Res: -1244746321 taskID: 5, Res: -1244746321 taskID: 6, Res: -1244746321 taskID: 7, Res: -1244746321 taskID: 8, Res: -1244746321 taskID: 9, Res: -1244746321 taskID: 10, Res: -1244746321 TaskFlag: 50, Install MCS plugins 11 is working. taskID: 11, Res: -1244746321 TaskFlag: 50, Install MCS plugins 13 is working. TaskFlag: 50, Install MCS plugins 12 is working. taskID: 12, Res: -1244746321 taskID: 13, Res: -1244746321 TaskFlag: 50, Install MCS plugins 16 is working. TaskFlag: 50, Install MCS plugins 14 is working. TaskFlag: 50, Install MCS plugins 17 is working. TaskFlag: 50, Install MCS plugins 19 is working. TaskFlag: 50, Install MCS plugins 21 is working. TaskFlag: 50, Install MCS plugins 18 is working. TaskFlag: 50, Install MCS plugins 15 is working. TaskFlag: 50, Install MCS plugins 28 is working. TaskFlag: 50, Install MCS plugins 26 is working. TaskFlag: 50, Install MCS plugins 24 is working. TaskFlag: 50, Install MCS plugins 27 is working. TaskFlag: 50, Install MCS plugins 29 is working. TaskFlag: 50, Install MCS plugins 20 is working. taskID: 14, Res: -1244746321 taskID: 15, Res: -1244746321 taskID: 16, Res: -1244746321 taskID: 17, Res: -1244746321 taskID: 18, Res: -1244746321 taskID: 19, Res: -1244746321 taskID: 20, Res: -1244746321 taskID: 21, Res: -1244746321 TaskFlag: 50, Install MCS plugins 25 is working. TaskFlag: 50, Install MCS plugins 23 is working. TaskFlag: 50, Install MCS plugins 22 is working. taskID: 22, Res: -1244746321 taskID: 23, Res: -1244746321 taskID: 24, Res: -1244746321 taskID: 25, Res: -1244746321 taskID: 26, Res: -1244746321 taskID: 27, Res: -1244746321 taskID: 28, Res: -1244746321 taskID: 29, Res: -1244746321 TaskFlag: 50, Install MCS plugins 30 is working. taskID: 30, Res: -1244746321 TaskFlag: 50, Install MCS plugins 31 is working. taskID: 31, Res: -1244746321 TaskFlag: 50, Install MCS plugins 33 is working. TaskFlag: 50, Install MCS plugins 32 is working. taskID: 32, Res: -1244746321 taskID: 33, Res: -1244746321 TaskFlag: 50, Install MCS plugins 34 is working. taskID: 34, Res: -1244746321 TaskFlag: 50, Install MCS plugins 37 is working. TaskFlag: 50, Install MCS plugins 36 is working. TaskFlag: 50, Install MCS plugins 35 is working. taskID: 35, Res: -1244746321 taskID: 36, Res: -1244746321 taskID: 37, Res: -1244746321 TaskFlag: 50, Install MCS plugins 45 is working. TaskFlag: 50, Install MCS plugins 38 is working. TaskFlag: 50, Install MCS plugins 40 is working. TaskFlag: 50, Install MCS plugins 42 is working. taskID: 38, Res: -1244746321 TaskFlag: 50, Install MCS plugins 44 is working. TaskFlag: 50, Install MCS plugins 43 is working. TaskFlag: 50, Install MCS plugins 41 is working. TaskFlag: 50, Install MCS plugins 39 is working. taskID: 39, Res: -1244746321 taskID: 40, Res: -1244746321 taskID: 41, Res: -1244746321 taskID: 42, Res: -1244746321 taskID: 43, Res: -1244746321 taskID: 44, Res: -1244746321 taskID: 45, Res: -1244746321 TaskFlag: 50, Install MCS plugins 49 is working. TaskFlag: 50, Install MCS plugins 48 is working. TaskFlag: 50, Install MCS plugins 47 is working. TaskFlag: 50, Install MCS plugins 46 is working. taskID: 46, Res: -1244746321 taskID: 47, Res: -1244746321 taskID: 48, Res: -1244746321 taskID: 49, Res: -1244746321 TaskFlag: 50, Install MCS plugins 50 is working. taskID: 50, Res: -1244746321 50 个 安装MCS 插件任务submit完成!Time:308 Executors.newSingleThreadExecutor().execute : shutdown后退出! Time:308 复制代码
1、Future提供的get方法是阻塞的,它将阻塞当前运行方法,直至有结果返回 因此,我们这里有一种错误的用法: 批量提交任务时,在提交任务后立即执行future.get() 此操作会阻塞线程的提交,导致线程池的能力不能发挥到最大;
2、ExecutorService提供了shutdown方法和shutdownNow方法,区别在于shutdown将与线程池等待队列中的所有线程执行完成后退出; 而shutdownNow则是强制退出,无论是否有任务执行。