上一篇我们简单描述了Executor框架的结构,本篇正式开始并发包中部分源码的解读。
我们知道,目前主流的商用虚拟机在线程的实现上可能会有所差别。但不管如何实现,在开启和关闭线程时一定会耗费很多CPU资源,甚至在线程的挂起和恢复JDK1.6都做了自旋锁的优化。所以,使用线程池来管理和执行多线程任务会大大提高程序执行效率。关于使用线程池的优点这里不做过多说明,我们直接进入Java5并发包中ThreadPoolExecutor的实现的源码。
在解读源码前,我们先来看看创建线程池的一般做法和线程池的几种类别:
1 Executors.newFixedThreadPool(int nThreads); // 创建一个固定线程数的线程池 2 Executors.newScheduledThreadPool(int nThreads); // 创建一个可对线程进行时间调度的线程池 3 Executors.newCachedThreadPool(); // 创建一个可缓冲的无线程数量界限(Integer.MAX_VALUE)的线程池 4 Executors.newSingleThreadExecutor(); // 创建一个可复用的单一线程的线程池
我们重点来看1、3、4条,在Executors中如何实现的
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
可以看到,差别只是ThreadPoolExecutor的构造方法的参数不同,下面来看看ThreadPoolExecutor的构造方法的参数(按顺序):
从参数说明中看出,1、3、4中的线程池主要是“核心线程数”和“最大线程数”的差别,而keepAliveTime和workQueue的差别是由“核心线程数”和“最大线程数”是否相等来决定的。那么“核心线程数”和“最大线程数”分别代表什么?带着这个疑问进入execute方法,源码如下:
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { 5 if (runState == RUNNING && workQueue.offer(command)) { 6 if (runState != RUNNING || poolSize == 0) 7 ensureQueuedTaskHandled(command); 8 } 9 else if (!addIfUnderMaximumPoolSize(command)) 10 reject(command); // is shutdown or saturated 11 } 12 }
第4行 的代码表达一件事:当线程池中当前线程数小于核心线程数时,执行addIfUnderCorePoolSize(command)方法,并且执行成功后 不再 执行后面的逻辑。那我们就来看看这个addIfUnderCorePoolSize(command)方法做了什么:
1 /** 2 * Creates and starts a new thread running firstTask as its first 3 * task, only if fewer than corePoolSize threads are running 4 * and the pool is not shut down. 5 * @param firstTask the task the new thread should run first (or 6 * null if none) 7 * @return true if successful 8 */ 9 private boolean addIfUnderCorePoolSize(Runnable firstTask) { 10 Thread t = null; 11 final ReentrantLock mainLock = this.mainLock; 12 mainLock.lock(); 13 try { 14 if (poolSize < corePoolSize && runState == RUNNING) 15 t = addThread(firstTask); 16 } finally { 17 mainLock.unlock(); 18 } 19 if (t == null) 20 return false; 21 t.start(); 22 return true; 23 }
方法注释的主要意思是:当运行线程少于核心线程时,就创建并运行一个新的线程。代码的 第15行 创建了一个新的线程, 第21行 运行了这个线程。接下来看看如何创建的这个线程:
1 private Thread addThread(Runnable firstTask) { 2 Worker w = new Worker(firstTask); 3 Thread t = threadFactory.newThread(w); 4 if (t != null) { 5 w.thread = t; 6 workers.add(w); 7 int nt = ++poolSize; 8 if (nt > largestPoolSize) 9 largestPoolSize = nt; 10 } 11 return t; 12 }
第二行可以看到, 线程池中真正执行的线程是由名为Worker的内部类来执行的 ,关于Worker的主要结构和方法如下:
( 注:addThread方法的注释中强调了要在持有mainLock的锁时才能调用,mainLock锁在线程池的安全并发的实现中担任着非常重要的角色,并且对于firstTask,有一点不同的逻辑在,由于篇幅有限,本文这里不做重点解读了 )
1 private final class Worker implements Runnable { 2 3 // others codes 4 5 /** 6 * Main run loop 7 */ 8 public void run() { 9 try { 10 Runnable task = firstTask; 11 firstTask = null; 12 while (task != null || (task = getTask()) != null) { 13 runTask(task); 14 task = null; 15 } 16 } finally { 17 workerDone(this); 18 } 19 } 20 }
可以看到,Worker实现了Runnable接口,线程池中执行的线程其实是Worker的run()方法。而 第13行 的runTask(task)方法的实现是直接调用了提交到线程池中的Runnable任务的run方法(具体代码请自行查看源码,这里不再列出,其中还包含一些针对shutdown和shutdownNow的逻辑),还有比较重要的是 第12行 的getTask()方法,最后来看getTask()的源码:
1 Runnable getTask() { 2 for (;;) { 3 try { 4 int state = runState; 5 if (state > SHUTDOWN) 6 return null; 7 Runnable r; 8 if (state == SHUTDOWN) // Help drain queue 9 r = workQueue.poll(); 10 else if (poolSize > corePoolSize || allowCoreThreadTimeOut) 11 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); 12 else 13 r = workQueue.take(); 14 if (r != null) 15 return r; 16 if (workerCanExit()) { 17 if (runState >= SHUTDOWN) // Wake up others 18 interruptIdleWorkers(); 19 return null; 20 } 21 // Else retry 22 } catch (InterruptedException ie) { 23 // On interruption, re-check runState 24 } 25 } 26 }
以上代码 第13行 将线程池保持线程不关闭的实现已经展示出来了: 由一个死循环不断的从队列中取出提交到线程池中的Runnable任务,然后直接调用其run()方法即可 。
基于这个原理,我们就会很容易的看懂其它的一些特性。
让我们先回头看看关于“核心线程”的源码,回到最开始的execute()的源码:
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { 5 if (runState == RUNNING && workQueue.offer(command)) { 6 if (runState != RUNNING || poolSize == 0) 7 ensureQueuedTaskHandled(command); 8 } 9 else if (!addIfUnderMaximumPoolSize(command)) 10 reject(command); // is shutdown or saturated 11 } 12 }
前面我们说根据 第4行 ,当线程池中当前线程数小于核心线程数时,执行addIfUnderCorePoolSize(command)方法并不再执行后面的代码。而当当前线程数大于等于核心线程数时,就会直接执行 第5行 的workQueue.offer(command),将新任务添加到名为workQueue队列中,也就是死循环中不断取Runnable任务的队列。这里这个workQueue是由构造方法传进来的workQueue队列。通过Executors创建线程池的1、3、4条种类可以看出,核心线程=最大线程的线程池,使用最大容量(Integer.MAX_VALUE)的LinkedBlockingQueue队列,就是说,线程池无法扩展,超出的Runnable任务全部进入阻塞队列中,等待Worker执行完。而核心线程<最大线程的线程池,使用无容量的SynchronousQueue队列,就是说,线程池可以无限扩展,扩展的线程全部新建Worker并执行。但根据getTask()方法的 第10行 和 第11行 ,超出核心线程数的Worker,空闲时只会存活keepAliveTime时间(构造方法的参数)。
OK,到这里,通过源码已经解释了ThreadPoolExecutor线程池主要的特性的实现原理。
上面罗里吧嗦的一大堆主要说明了JDK源码中实现的ThreadPoolExecutor线程池的以下几个主要特性(来自JDK API的描述):
核心线程数与最大线程数的意义:
ThreadPoolExecutor将根据corePoolSize和maximumPoolSize设置的边界自动调整池大小。当新任务在方法 execute(java.lang.Runnable)中提交时,如果运行的线程少于corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。如果运行的线程多于corePoolSize而少于maximumPoolSize,则仅当队列满时才创建新线程。如果设置的corePoolSize和maximumPoolSize相同,则创建了固定大小的线程池。如果将 maximumPoolSize设置为基本的无界值(如Integer.MAX_VALUE),则允许池适应任意数量的并发任务。
保持活动时间:
如果池中当前有多于 corePoolSize 的线程,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止
排队:
所有 BlockingQueue
都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:
然后终止线程池的几种方式及被拒绝的任务由构造方法传入的handler处理等特性本文并未给出源码解读,感兴趣的读者可自行查看JDK源码。
本文通过部分关键处源码的解读,介绍了ThreadPoolExecutor线程池的实现原理。我个人简单总结为两点:
这两点只是做概括,真正展开来描述,还是有很多细节的。