public class ThreadPoolExecutor extends AbstractExecutorService { // 包含 4 个构造方法。 其他 3 个通过调用该构造方法。 public ThreadPoolExecutor( int corePoolSize, // 核心线程数量 int maximumPoolSize, // 最大线程数 long keepAliveTime, // 线程没有执行任务时最大保持多久终止 TimeUnit unit, // keepAliveTime的时间单位 BlockingQueue<Runnable> workQueue, // 阻塞队列,存储等待执行的任务 ThreadFactory threadFactory, // 线程工厂,用来创建线程 RejectedExecutionHandler handler // 当拒绝处理任务时的策略 ) { } } 复制代码
核心线程数量。
默认情况下,创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列(workQueue)当中。
最大线程数。在线程池中最多能创建多少个线程。
线程没有执行任务时最大保持多久终止。
默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用。
调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0。
参数 keepAliveTime 的时间单位
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小时 TimeUnit.MINUTES; //分钟 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //纳秒 复制代码
阻塞队列,存储等待执行的任务,对线程池运行过程产生重大影响。
线程工厂,主要用来创建线程。
当拒绝处理任务时的策略。
// 丢弃任务,并抛出 RejectedExecutionException ThreadPoolExecutor.AbortPolicy // 丢弃任务,不抛出异常 ThreadPoolExecutor.DiscardPolicy // 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.DiscardOldestPolicy // 调用线程处理该任务 ThreadPoolExecutor.CallerRunsPolicy 复制代码
public class ThreadPoolExecutor extends AbstractExecutorService { // 提交任务,交给线程池执行 public void execute(Runnable command) {} // 提交任务,能够返回执行结果 (execute + Future) public Future<?> submit(Runnable task) {} public <T> Future<T> submit(Runnable task, T result) {} public <T> Future<T> submit(Callable<T> task) {} // 关闭线程池,等待任务执行完 public void shutdown() {} // 立即关闭,不等待任务关闭 public void shutdownNow() {} // 获得线程池中已执行和未执行的任务总数 public long getTaskCount() {} // 获得已完成任务数量 public long getCompletedTaskCount() {} // 线程池当前的线程数量 public int getPoolSize() {} // 获得当前线程池中正在执行的线程数量 public int getActiveCount() { } 复制代码
利用Executors提供的通用线程池创建方法,创建不同配置的线程池,主要区别在于不同的ExecutorService类型或者不同的初始参数。
public static void main(String[] args) { ExecutorService pool = Executors.newSingleThreadExecutor(); // 执行过程将会顺序输出 0 --> 9 for (int i = 0; i < 10; i++) { final int index = i; executorService.execute(() -> { System.out.println(index); }) } } 复制代码
// corePoolSize = 1 // maximumPoolSIze = 1 // keyAliveTime = 0L // unit = TimeUnit.MILLISECONDS // workQueue = new LinkedBlockingQueue<Runnable>() // threadFactory = Executors.defaultThreadFactory() // handler = defaultHandler = new AbortPolicy() public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } 复制代码
public static void main(String[] args) { // 线程池大小为3 ExecutorService executorService = Executors.newFixedThreadPool(3); // 每隔秒打印三个数字 for (int i = 0; i < 50; i++) { final int index = i; executorService.execute(() -> { System.out.println(index); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); } executorService.shutdown(); } 复制代码
// corePoolSize = nThreads // maximumPoolSIze = nThreads // keyAliveTime = 0L // unit = TimeUnit.MILLISECONDS // workQueue = new LinkedBlockingQueue<Runnable>() // threadFactory = Executors.defaultThreadFactory() // handler = defaultHandler = new AbortPolicy() public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } 复制代码
public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int index = i; executorService.execute(new Runnable() { @Override public void run() { System.out.println(index); } }); } executorService.shuntdown(); } 复制代码
// corePoolSize = 0 // maximumPoolSIze = Integer.MAX_VALUE // keyAliveTime = 60L // unit = TimeUnit.MILLISECONDS // workQueue = new LinkedBlockingQueue<Runnable>() // threadFactory = Executors.defaultThreadFactory() // handler = defaultHandler = new AbortPolicy() public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 复制代码
public static void main(String[] args) { // new ScheduledThreadPoolExecutor(corePoolSize) ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5); // executorService 执行具有调度含义 // delay: 3 SECONDS 后执行 executorService.schedule(() -> System.out.println("schedule running"), 2, TimeUnit.SECONDS); executorService.shutdown(); } 复制代码
public static void main(String[] args) { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5); // executorService 执行具有调度含义 // scheduleAtFixedRate 以指定的速率运行 每隔一段时候就触发 // 1: initalDelay 延迟1秒 // 3: period 每格3秒 executorService.scheduleAtFixedRate(() -> System.out.println(System.nanoTime()), 1, 3, TimeUnit.SECONDS); // 不适用关闭线程池 // 若需要关闭线程池,可通过提供关闭信息,再调用该方法 // executorService.shutdown(); } 复制代码
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } // --- // DEFAULT_KEEPALIVE_MILLIS = 10L // MILLISECONDS = TimeUnit.MILLI_SCALE public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); } 复制代码
Java 8才加入这个创建方法,其内部会构建ForkJoinPool,利用Work-Stealing算法,并行地处 理任务(默认为主机CPU的可用核心数),不保证处理顺序
public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } // --- public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); } 复制代码