比如现在收集上的修图软件。一张 1920 x1080 的图片有 200多万个像素点,对整个图片的每个像素点处理一遍也是需要不少的计算量。
服务器端处理大数据、大量请求时如果只是单个线程来进行,也是无法满足需求的。
此外,不管是处理应用还是服务器,即使使用了多线程,如果频繁进行创建和销毁线程,最终创建和销毁的时间有可能大于真正执行的时间。将对象复用是一个不错的选择,因此可以选择线程池。
线程池涉及到的常用的 Java 类包括 ThreadPoolExecutor、Executors 等。
(1)ThreadPoolExecutor 创建对象较为复杂,如下图是在 JDK 11 中提供的几个构造函数。
这些参数主要涉及到线程池一些关键参数的设置。以参数最多的这个构造函数为例:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... } 复制代码
corePoolSize
空闲时线程池中保存的线程数。
maximumPoolSize
线程池中允许的最大线程数。
keepAliveTime
当有空闲线程时,且现有的线程数大于 corePoolSize 在超过该指定的时间则删除该线程,在指定的时间内并不删除线程。
unit
指设置的 keepAliveTime 的时间单位。
workQueue
执行前用来保存任务队列。
threadFactory
指定创建自定义线程的工厂类。
handler
RejectedExecutionHandler 的作用是当线程池关闭之后如有任务加入的时候,可以实现一些处理,例如将被拒绝的线程信息记录下来。
(2)execute() 和 submit() 方法
两者都是能像线程池提交任务的,但是细节会有不同。
execute()
可以添加一个任务,但是类型只能是Runnable 类型,且该任务不能返回结果。当遇到自线程异常时,直接打印异常信息,主线程无法捕获异常信息。
submit()
可以添加一个任务,但是返回一个 Future 对象,并能够通过 Future 获得返回结果。当遇到异常时,能够在主线程中捕获异常信息。
submit() 方法的参数涉及到两类 callable 接口和 Runnable ,它们的主要区别是:
(a)Callable 接口的 call() 方法可有返回值,而 Runnable 接口的 run() 方法没有返回值。
(b)Callable 接口的 call() 方法可以生命抛出异常,而 Runnable 接口的 run() 方法不可以声明抛出异常。
(3)其他常用方法
shutdown()
主线程立即结束,且不会阻塞,线程池中为执行完的线程会继续执行,线程池中不在添加新的 Task,线程池会继续运行知道线程都执行完成。
当线程会立即编程 SHUTDOWN 状态,此时不再想线程池中添加 Task,否则抛出 RejectedExecutionException 异常。
shutdownNow()
中断所有的任务,并抛出 InterruptException 异常。执行后立即进入 STOP 状态,并试图停止所有正在执行的线程,不再处理还在线程池中等待的任务。
该方法执行后会返回未执行的任务:List
isShutdown()
判断线程池是否已经关闭。
isTerminating()
是否正在终止的过程中时,返回 true。
isTerminated()
判断所有任务都已经完成。
awaitTermination()
查看在指定的时间之间,线程池是否已经终止工作。
Executors 是线程池的创建类,能够方便通过已经提供的方法的创建线程池。
(1)Executors 提供的常用方法:
newCachedThreadPool()
创建无边界的线程池,可以进行线程自动回收,所谓“最大线程池”就是池中存放线程数位理论上的最大值, Integer.MAX_VALUE 最大值。
newCachedThreadPool(ThreadFactory)
自定义线程池中的线程
newFixedThreadPool(int)
创建有边界的线程池
newFixedThreadPool(int, ThreadFactory)
创建自定义线程的有边界线程池。
newSingleThreadExector()
创建单一线程池,可实现以队列的方式执行任务。
newSingleThreadExecutor()
用工厂方式创建单一线程池。
package com.page.concurrent.pool; import java.util.concurrent.*; public class Game { public static void main(String[] args) throws ExecutionException, InterruptedException { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 1, 2, 1000 * 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); Future<String> future = threadPoolExecutor.submit(() -> { System.out.println("Should work 4 seconds."); Thread.sleep(1000 * 4); System.out.println("Work done."); return "OK"; }); String result = future.get(); System.out.println("Thread response result is " + result); threadPoolExecutor.shutdownNow(); System.out.println("Thread pool status: isShutdown=" + threadPoolExecutor.isShutdown()); Thread.sleep(1000 * 5); System.out.println("Thread pool status: isTerminated" + threadPoolExecutor.isTerminated()); } } 复制代码
package com.page.concurrent.pool; import java.util.concurrent.*; public class Game { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); Future<String> future = executorService.submit(() -> { System.out.println("Should work 4 seconds."); Thread.sleep(1000 * 4); System.out.println("Work done."); return "OK"; }); String result = future.get(); System.out.println("Thread response result is " + result); executorService.shutdownNow(); System.out.println("Thread pool status: isShutdown=" + executorService.isShutdown()); Thread.sleep(1000 * 5); System.out.println("Thread pool status: isTerminated" + executorService.isTerminated()); } } 复制代码