线程池是一种生产者 - 消费者模式,线程池的使用方是生产者,线程池本身是消费者。我们可以通过下面的代码来理解线程池的工作原理。
public class ThreadPoolDemo { //利用阻塞队列实现生产者-消费者模式 BlockingQueue<Runnable> workQueue; //保存内部工作线程 List<WorkerThread> threads = new ArrayList<>(); private ThreadPoolDemo(int poolSize, BlockingQueue<Runnable> workQueue) { this.workQueue = workQueue; // 创建工作线程 for (int idx = 0; idx < poolSize; idx++) { WorkerThread work = new WorkerThread(); work.start(); threads.add(work); } } // 提交任务 public void execute(Runnable command) throws InterruptedException { workQueue.put(command); } // 工作线程负责消费任务,并执行任务 class WorkerThread extends Thread { public void run() { //循环取任务并执行 while (true) { Runnable task = null; try { task = workQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } assert task != null; task.run(); } } } public static void main(String[] args) throws InterruptedException { BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2); // 创建线程池 ThreadPoolDemo pool = new ThreadPoolDemo(10, workQueue); // 提交任务 pool.execute(()->{ System.out.println("hello world"); }); } } 复制代码
在 ThreadPoolDemo 的内部,维护了一个阻塞队列 workQueue 和一组工作线程,工作线程的个数由构造函数中的 poolSize 来指定。用户通过调用 execute() 方法来提交 Runnable 任务,execute() 方法的内部实现仅仅是将任务加入到 workQueue 中。ThreadPoolDemo 内部维护的工作线程会消费 workQueue 中的任务并执行任务,相关的代码就是代码中的 while 循环。线程池主要的工作原理也就这些。
Java 并发包里提供的线程池,远比上面的示例代码强大得多,也复杂得多。Java 提供的线程池相关的工具类中,最核心的是 ThreadPoolExecutor,通过名字你也能看出来,它强调的是 Executor。
ThreadPoolExecutor 的构造函数非常复杂,如下面代码所示,这个最完备的构造函数有 7 个参数。
ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {} 复制代码
下面介绍这些参数的意义, 你可以把线程池类比为一个项目组,而线程就是项目组的成员。
corePoolSize:表示线程池保有的最小线程数。有些项目很闲,但是也不能把人都撤了,至少要留 corePoolSize 个人坚守阵地。
maximumPoolSize:表示线程池创建的最大线程数。当项目很忙时,就需要加人,但是也不能无限制地加,最多就加到 maximumPoolSize 个人。当项目闲下来时,就要撤人了,最多能撤到 corePoolSize 个人。
keepAliveTime & unit:上面提到项目根据忙闲来增减人员,那在编程世界里,如何定义忙和闲呢?很简单,一个线程如果在一段时间内,都没有执行任务,说明很闲,keepAliveTime 和 unit 就是用来定义这个“一段时间”的参数。也就是说,如果一个线程空闲了keepAliveTime & unit这么久,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收了。
workQueue:工作队列,和上面示例代码的工作队列同义。threadFactory:通过这个参数你可以自定义如何创建线程,例如你可以给线程指定一个有意义的名字。
handler:通过这个参数你可以自定义任务的拒绝策略。如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。至于拒绝的策略,你可以通过 handler 这个参数来指定。ThreadPoolExecutor 已经提供了以下 4 种策略。
1. CallerRunsPolicy :提交任务的线程自己去执行该任务。
2. AbortPolicy :默认的拒绝策略,会 throws RejectedExecutionException。
3. DiscardPolicy :直接丢弃任务,没有任何异常抛出。
4. DiscardOldestPolicy :丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。
Java 在 1.6 版本还增加了 allowCoreThreadTimeOut(boolean value) 方法,它可以让所有线程都支持超时,这意味着如果线程池很闲,就会关闭线程池中的线程。
1.大厂开发规范,线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
1)FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
2.使用线程池,还要注意异常处理的问题,例如通过 ThreadPoolExecutor 对象的 execute() 方法提交任务时,如果任务在执行的过程中出现运行时异常,会导致执行任务的线程终止;不过,最致命的是任务虽然异常了,但是你却获取不到任何通知,这会让你误以为任务都执行得很正常。虽然线程池提供了很多用于异常处理的方法,但是最稳妥和简单的方案还是捕获所有异常并按需处理,你可以参考下面的示例代码。
try { //业务逻辑 } catch (RuntimeException x) { //按需处理 } catch (Throwable x) { //按需处理 } 复制代码
3.使用线程池的时候最好设置业务相关的名称,有利于排查业务问题。