测试demo, ThreadPoolExecutorTest:
public class ThreadPoolExecutorTest { public static void main(String[] args) throws InterruptedException { final boolean isFair = false; ArrayBlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<Runnable>(10, isFair); // arrayBlockingQueue.add(new MyThreadTask(10086)); final int corePoolSize = 3; final int maximumPoolSize = 6; ThreadPoolExecutor threadPool = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, 1, TimeUnit.SECONDS, arrayBlockingQueue, new ThreadPoolExecutor.CallerRunsPolicy()); // threadPool.allowCoreThreadTimeOut(true); // Integer result = 9; for (int index = 1; index <= 10; index++) { Thread tempNewThread = new MyThreadTask(index); threadPool.execute(tempNewThread); // result = threadPool.submit(new MyThreadTask(i), result); } // threadPool.shutdown(); } }
ThreadPoolExecutor 抽出来的一些核心方法:
public class ThreadPoolExecutor extends AbstractExecutorService { private final BlockingQueue<Runnable> workQueue; private final ReentrantLock mainLock = new ReentrantLock(); private final HashSet<Worker> workers = new HashSet<Worker>(); /*** * 线程中真正执行的Worker线程 */ private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /*** * 代理, 执行上层的runWorker方法; */ public void run() { runWorker(this); } } /*** * 把firstTask 添加到核心线程, 并启动; * @param firstTask * @param core 是否是核心线程 * @return */ private boolean addWorker(Runnable firstTask, boolean core) { boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (!workerStarted) addWorkerFailed(w); } return workerStarted; } /*** * 从 workQueue 的线程等待队列中获取线程(后面准备执行); * @return */ private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } /*** * 运行Worker线程; * * while (task != null || (task = getTask()) != null) * 第一次判断是当前的核心线程; * 第二个判断是核心线程第一次执行完毕, 则从workQueue中获取线程继续执行; * * task.run(); * 直接调用的run方法(外层已经有worker的线程包装起的) */ final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } /*** * 执行, 线程池执行线程的总入口 * @param command */ public void execute(Runnable command) { int c = ctl.get(); // 核心线程执行 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 核心已经多了, 添加到 workQueue if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } /** * 其中一个拒绝策略 */ public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } }