通常都是使用Executors提供的通用线程池创建方法去创建不同配置的线程池,主要区别在于不同的ExecutorService类型或者不同的初始参数,主要分为五类:
void execute(Runnable command); <T> Future<T> submit(Callable<T> task);
线程池:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
private final BlockingQueue<Runnable> workQueue; private final HashSet<Worker> workers = new HashSet<>();
ctl变量被赋予了双重角色,高位代表了线程池状态,低位代表了工作线程数目。这是一个典型的高效优化。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 真正决定了工作线程数的理论上限 private static final int COUNT_BITS = Integer.SIZE - 3; private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; // 线程池状态,存储在数字的高位 private static final int RUNNING = -1 << COUNT_BITS; … // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~COUNT_MASK; } private static int workerCountOf(int c) { return c & COUNT_MASK; } private static int ctlOf(int rs, int wc) { return rs | wc; }
线程的状态流转:
注意,实际java代码中不存在idle状态。
public void execute(Runnable command) { … int c = ctl.get(); // 检查工作线程数目,低于 corePoolSize 则添加 Worker if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // isRunning 就是检查线程池是否被 shutdown // 工作队列可能是有界的,offer 是比较友好的入队方式 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次进行防御性检查 if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 尝试添加一个 worker,如果失败意味着已经饱和或者被 shutdown 了 else if (!addWorker(command, false)) reject(command); }