马上就要过年了,还在岗位上坚守“swimming”的小伙伴们顶住。博主给大家带来一篇线程池的基本使用解解闷。
1、减少线程创建与切换的开销
2、控制线程的数量
重复利用有限的线程
其实常用Java线程池本质上都是由 ThreadPoolExecutor
或者 ForkJoinPool
生成的,只是其根据构造函数传入不同的实参来实例化相应线程池而已。
Executors
是一个线程池工厂类,该工厂类包含如下集合静态工厂方法来创建线程池:
newFixedThreadPool() newSingleThreadExecutor() newCachedThreadPool() newWorkStealingPool() newScheduledThreadPool()
对设计模式有了解过的同学都会知道,我们尽量面向接口编程,这样对程序的灵活性是非常友好的。Java线程池也采用了面向接口编程的思想,可以看到 ThreadPoolExecutor
和 ForkJoinPool
所有都是 ExecutorService
接口的实现类。在 ExecutorService
接口中定义了一些常用的方法,然后再各种线程池中都可以使用 ExecutorService
接口中定义的方法,常用的方法有如下几个:
向线程池提交线程
Future<?> submit() void execute(Runnable command)
关闭线程池
void shutdown() List<Runnable> shutdownNow()
检查线程池的状态
boolean isShutdown() boolean isTerminated()
线程池中的线程数目是固定的,不管你来了多少的任务。
public class MyFixThreadPool { public static void main(String[] args) throws InterruptedException { // 创建一个线程数固定为5的线程池 ExecutorService service = Executors.newFixedThreadPool(5); System.out.println("初始线程池状态:" + service); for (int i = 0; i < 6; i++) { service.execute(() -> { try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }); } System.out.println("线程提交完毕之后线程池状态:" + service); service.shutdown();//会等待所有的线程执行完毕才关闭,shutdownNow:立马关闭 System.out.println("是否全部线程已经执行完毕:" + service.isTerminated());//所有的任务执行完了,就会返回true System.out.println("是否已经执行shutdown()" + service.isShutdown()); System.out.println("执行完shutdown()之后线程池的状态:" + service); TimeUnit.SECONDS.sleep(5); System.out.println("5秒钟过后,是否全部线程已经执行完毕:" + service.isTerminated()); System.out.println("5秒钟过后,是否已经执行shutdown()" + service.isShutdown()); System.out.println("5秒钟过后,线程池状态:" + service); } } 复制代码
初始线程池状态:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
线程提交完毕之后线程池状态:[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
是否全部线程已经执行完毕:false
是否已经执行shutdown():true
执行完shutdown()之后线程池的状态:[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
pool-1-thread-2
pool-1-thread-1
pool-1-thread-4
pool-1-thread-5
pool-1-thread-3
pool-1-thread-2
5秒钟过后,是否全部线程已经执行完毕:true
5秒钟过后,是否已经执行shutdown():true
5秒钟过后,线程池状态:[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
Running
状态了,但是 pool size
(线程池线程的数量)、 active threads
(当前活跃线程) queued tasks
(当前排队线程)、 completed tasks
(已完成的任务数)都是0 pool size = 5
:因为我们创建的是一个固定线程数为5的线程池(注意:如果这个时候我们只提交了3个任务,那么 pool size = 3
,说明线程池也是通过懒加载的方式去创建线程)。 active threads = 5
:虽然我们向线程池提交了6个任务,但是线程池的固定大小为5,所以活跃线程只有5个 queued tasks = 1
:虽然我们向线程池提交了6个任务,但是线程池的固定大小为5,只能有5个活跃线程同时工作,所以有一个任务在等待 shutdown()
的时候,由于任务还没有全部执行完毕,所以 isTerminated()
返回 false
, shutdown()
返回true,而线程池的状态会由 Running
变为 Shutting down
pool-1-thread-2
执行了两次任务,证明线程池中的线程确实是重复利用的。 isTerminated()
返回 true
, shutdown()
返回 true
,证明所有的任务都执行完了,线程池也关闭了,我们再次检查线程池的状态 [Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
,状态已经处于 Terminated
了,然后已完成的任务显示为6 从头到尾整个线程池都只有一个线程在工作。
public class SingleThreadPool { public static void main(String[] args) { ExecutorService service = Executors.newSingleThreadExecutor(); for (int i = 0; i < 5; i++) { final int j = i; service.execute(() -> { System.out.println(j + " " + Thread.currentThread().getName()); }); } } } 复制代码
0 pool-1-thread-1 1 pool-1-thread-1 2 pool-1-thread-1 3 pool-1-thread-1 4 pool-1-thread-1
程序分析可以看到只有 pool-1-thread-1
一个线程在工作。
来多少任务,就创建多少线程(前提是没有空闲的线程在等待执行任务,否则还是会复用之前旧(缓存)的线程),直接你电脑能支撑的线程数的极限为止。
public class CachePool { public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); System.out.println("初始线程池状态:" + service); for (int i = 0; i < 12; i++) { service.execute(() -> { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }); } System.out.println("线程提交完毕之后线程池状态:" + service); TimeUnit.SECONDS.sleep(50); System.out.println("50秒后线程池状态:" + service); TimeUnit.SECONDS.sleep(30); System.out.println("80秒后线程池状态:" + service); } } 复制代码
初始线程池状态:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
线程提交完毕之后线程池状态:[Running, pool size = 12, active threads = 12, queued tasks = 0, completed tasks = 0]
pool-1-thread-3
pool-1-thread-4
pool-1-thread-1
pool-1-thread-2
pool-1-thread-5
pool-1-thread-8
pool-1-thread-9
pool-1-thread-12
pool-1-thread-7
pool-1-thread-6
pool-1-thread-11
pool-1-thread-10
50秒后线程池状态:[Running, pool size = 12, active threads = 0, queued tasks = 0, completed tasks = 12]
80秒后线程池状态:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 12]
可以在指定延迟后或周期性地执行线程任务的线程池。
newScheduledThreadPool()
方法返回的其实是一个 ScheduledThreadPoolExecutor
对象, ScheduledThreadPoolExecutor
定义如下: public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { 复制代码
ThreadPoolExecutor
并实现了 ScheduledExecutorService
接口,而 ScheduledExecutorService
也是继承了 ExecutorService
接口,所以我们也可以像使用之前的线程池对象一样使用,只不过是该对象会额外多了一些方法用于控制延迟与周期: public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit) public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,ong initialDelay,long delay,TimeUnit unit)
下面代码每500毫秒打印一次当前线程名称以及一个随机数字。
public class MyScheduledPool { public static void main(String[] args) { ScheduledExecutorService service = Executors.newScheduledThreadPool(4); service.scheduleAtFixedRate(() -> { System.out.println(Thread.currentThread().getName() + new Random().nextInt(1000)); }, 0, 500, TimeUnit.MILLISECONDS); } } 复制代码
每个线程维护着自己的队列,执行完自己的任务之后,会去主动执行其他线程队列中的任务。
public class MyWorkStealingPool { public static void main(String[] args) throws IOException { ExecutorService service = Executors.newWorkStealingPool(4); System.out.println("cpu核心:" + Runtime.getRuntime().availableProcessors()); service.execute(new R(1000)); service.execute(new R(2000)); service.execute(new R(2000)); service.execute(new R(2000)); service.execute(new R(2000)); //由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出 System.in.read(); } static class R implements Runnable { int time; R(int time) { this.time = time; } @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(time + " " + Thread.currentThread().getName()); } } } 复制代码
cpu核心:4 1000 ForkJoinPool-1-worker-1 2000 ForkJoinPool-1-worker-0 2000 ForkJoinPool-1-worker-3 2000 ForkJoinPool-1-worker-2 2000 ForkJoinPool-1-worker-1
程序分析 ForkJoinPool-1-worker-1
任务的执行时间是1秒,它会最先执行完毕,然后它会去主动执行其他线程队列中的任务。
ForkJoinPool
可以将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。 ForkJoinPool
提供了如下几个方法用于创建 ForkJoinPool
实例对象:
ForkJoinPool(int parallelism)
:创建一个包含parallelism个并行线程的 ForkJoinPool
,parallelism的默认值为 Runtime.getRuntime().availableProcessors()
方法的返回值 ForkJoinPool commonPool()
:该方法返回一个通用池,通用池的运行状态不会受 shutdown()
或 shutdownNow()
方法的影响。
创建了 ForkJoinPool
示例之后,就可以调用 ForkJoinPool
的 submit(ForkJoinTask task)
或 invoke(ForkJoinTask task)
方法来执行指定任务了。其中 ForkJoinTask
(实现了Future接口)代表一个可以并行、合并的任务。 ForkJoinTask
是一个抽象类,他还有两个抽象子类: RecursiveAction
和 RecursiveTask
。其中 RecursiveTask
代表有返回值的任务,而 RecursiveAction
代表没有返回值的任务。
下面代码演示了使用 ForkJoinPool
对1000000个随机整数进行求和。
public class MyForkJoinPool { static int[] nums = new int[1000000]; static final int MAX_NUM = 50000; static Random random = new Random(); static { for (int i = 0; i < nums.length; i++) { nums[i] = random.nextInt(1000); } System.out.println(Arrays.stream(nums).sum()); } // static class AddTask extends RecursiveAction { // // int start, end; // // AddTask(int start, int end) { // this.start = start; // this.end = end; // } // // @Override // protected void compute() { // if (end - start <= MAX_NUM) { // long sum = 0L; // for (int i = 0; i < end; i++) sum += nums[i]; // System.out.println("from:" + start + " to:" + end + " = " + sum); // } else { // int middle = start + (end - start) / 2; // // AddTask subTask1 = new AddTask(start, middle); // AddTask subTask2 = new AddTask(middle, end); // subTask1.fork(); // subTask2.fork(); // } // } // } static class AddTask extends RecursiveTask<Long> { int start, end; AddTask(int start, int end) { this.start = start; this.end = end; } @Override protected Long compute() { // 当end与start之间的差大于MAX_NUM,将大任务分解成两个“小任务” if (end - start <= MAX_NUM) { long sum = 0L; for (int i = start; i < end; i++) sum += nums[i]; return sum; } else { int middle = start + (end - start) / 2; AddTask subTask1 = new AddTask(start, middle); AddTask subTask2 = new AddTask(middle, end); // 并行执行两个“小任务” subTask1.fork(); subTask2.fork(); // 把两个“小任务”累加的结果合并起来 return subTask1.join() + subTask2.join(); } } } public static void main(String[] args) throws IOException { ForkJoinPool forkJoinPool = new ForkJoinPool(); AddTask task = new AddTask(0, nums.length); forkJoinPool.execute(task); long result = task.join(); System.out.println(result); forkJoinPool.shutdown(); } } 复制代码
上面我们说到过:其实常用Java线程池都是由 ThreadPoolExecutor
或者 ForkJoinPool
两个类生成的,只是其根据构造函数传入不同的实参来生成相应线程池而已。那我们现在一起来看看Executors中几个创建线程池对象的静态方法相关的源码:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { 复制代码
corePoolSize maximumPoolSize keepAliveTime unit workQueue
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } 复制代码
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } 复制代码
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 复制代码
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } 复制代码
public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } 复制代码
觉得文章写得不错的朋友可以点赞、转发、加关注呀!你们的支持就是我最大的动力,笔芯!