本文主要分析Dubbo线程池的构建过程,主要介绍官方文档中有关于ThreadPool的种类:
各种类型的线程池,内部就是根据规则创建不同的ThreadPoolExecutor对象,那我们先简单回顾一下线程池的基本知识,其构造方法如下所示:
public ThreadPoolExecutor( int corePoolSize, // 线程池核心线程数、常驻线程数。 int maximumPoolSize, // 线程池中最大线程数量 long keepAliveTime, // 线程保持活跃时间,(如果线程创建,并空闲 //指定值后,线程会被回收,0表示不开启该特性,其范围针对 // corePoolSize的线程) TimeUnit unit, // keepAliveTime的时间单位。 BlockingQueue< Runnable> workQueue,// 任务队列 ThreadFactory threadFactory, // 线程工厂类,一般通过该线程工厂,为线程命名,以便区分线程。 RejectedExecutionHandler handler) // 拒绝策略。 复制代码
提交任务流程(线程创建流程)
public class FixedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } } 复制代码
实现要点:
这里再简单介绍如果队列长度为0(默认),为什么是选用SynchronousQueue队列。 SynchronousQueue的一个简单理解:调用offer、put之前,必须先调用take,也就是先调用take方法的线程阻塞,然后当别的线程调用offer之后,调用take的线程被唤醒,如果没有线程调用take方法,一个线程调用offer方法,则会返回false,并不会将元素添加到SynchronousQueue队列中,因为SynchronousQueue内部的队列长度为0。 与该线程池相关的配置属性:threadname、theadpool、threads、queues。
public class CachedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE); int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE); return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } } 复制代码
实现要点:既然要实现线程可以被回收,则必然要设置 keepAliveTime。 故对应线程池核心参数设置,对应如下:
public class LimitedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } } 复制代码
就不回收,与cached不同的就是 keepAliveTime 的取值不同,limited 取值为:Long.MAX_VALUE,其他与 cached 相同。
其核心实现主要由 TaskQueue、EagerThreadPoolExecutor 共同完成。 首先,我们关注一下 TaskQueued 的 offer方法。
public boolean offer(Runnable runnable) { if (executor == null) { throw new RejectedExecutionException("The task queue does not have executor!"); } int currentPoolThreadSize = executor.getPoolSize(); // @1 // have free worker. put task into queue to let the worker deal with task. if (executor.getSubmittedTaskCount() < currentPoolThreadSize) { // @2 return super.offer(runnable); } // return false to let executor create new worker. if (currentPoolThreadSize < executor.getMaximumPoolSize()) { // @3 return false; } // currentPoolThreadSize >= max // @4 return super.offer(runnable); } 复制代码
代码@1:获取当前线程池中线程的数量。
代码@2:如果当前已提交到线程池中的任务数量小于当前存在在的线程数,则走默认的提交流程。
代码@3:如果当前已提交到线程中的数量大于当前的线程池,并线程池中数量并未达到线程池允许创建的最大线程数时,则返回false,并不入队,其效果是会创建新的线程来执行。
代码@4:如果当前线程池中的线程已达到允许创建的最大线程数后,走默认的提交任务逻辑。 EagerThreadPoolExecutor#execute
public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } // do not increment in method beforeExecute! submittedTaskCount.incrementAndGet(); // @1 try { super.execute(command); } catch (RejectedExecutionException rx) { // retry to offer the task into queue. final TaskQueue queue = (TaskQueue) super.getQueue(); try { if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException("Queue capacity is full."); } } catch (InterruptedException x) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException(x); } } catch (Throwable t) { // decrease any way submittedTaskCount.decrementAndGet(); // @2 } } 复制代码
其核心实现逻辑:如果提交任务失败,则再走一次默认的任务提交流程。 最总后结一下Eager的核心特性。
public class EagerThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE); int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE); // init queue and executor TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues); EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, taskQueue, new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); taskQueue.setExecutor(executor); return executor; } } 复制代码
其核心特性如下:
作者介绍:丁威,《RocketMQ技术内幕》作者,RocketMQ 社区优秀布道师、CSDN2019博客之星TOP10,维护公众号: 中间件兴趣圈 目前已陆续发表源码分析Java集合、Java 并发包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源码专栏。可以点击链接加入 中间件知识星球 ,一起探讨高并发、分布式服务架构,交流源码。