在并发相关,不仅仅依靠之前介绍的各种锁或者队列操作,同时我们也需要考虑到资源的消耗情况(力扣上各种题目比消耗与时间。。)。这个时候我们就引入了线程池。
针对于大家熟悉的 Executors
进行入手,我们经常性的使用里面的线程池。当然,根据阿里巴巴的规范手册上来说,不建议我们直接通过这个类去创建一个线程池,需要通过 ThreadPoolExecutor
自行去创建,这样会让我们懂得线程池中的线程各个时间的状态变化,以防止线程池中的线程异常。
Executors
newFixedThreadPool(int nThreads) newSingleThreadExecutor() newCachedThreadPool() newScheduledThreadPool(int corePoolSize)
前三个里面的实现都是利用了 ThreadPoolExecutor
,只不过传入的参数是不同的,然后造就了不同的线程池,接下来就看看 ThreadPoolExecutor
里面参数的各个含义。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 复制代码
corePoolSize maximumPoolSize keepAliveTime unit workQueue
默认会帮我们填充的两个参数:
threadFactory
线程工厂,用于产生线程放入到线程池中 handler
拒绝策略处理器,用于当前线程池中拒绝多余的任务等,默认是 AbortPolicy
拒绝策略 线程工厂里面就会帮我们产生一个个线程放入到线程池中:
public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon())//是否为守护线程 t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY)//是否为默认的优先级 t.setPriority(Thread.NORM_PRIORITY); return t; } 复制代码
在线程池中的 execute(Runnable command)
方法是会去帮助我们去执行我们所需要的任务:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get();//ctl是一个包装的原子类,里面包含了线程的数量以及状态 if (workerCountOf(c) < corePoolSize) {//工作线程数量小于当前的核心线程数 if (addWorker(command, true))//加入到队列中,并且true代表使用核心线程 return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) {//判断线程池是否在运行,同时将任务加入到队列中 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command))//如果线程池不是运行状态则进行拒绝任务操作 reject(command); else if (workerCountOf(recheck) == 0)//如果线程数为0,则开辟一个线程去执行,不采用核心线程 addWorker(null, false); } else if (!addWorker(command, false))//加入队列失败则进行线程数增加,这里采用的线程是大于核心线程数小于最大线程数。 reject(command); } 复制代码
ForkJoinTask
这是一个比较特殊的线程池,可以将一个很大的任务进行分解成为若干个小的任务去执行。执行完成之后再将每个任务的结果进行整合返回。底下派生出两个抽象类: RecursiveAction
是没有返回值的,只是将任务划分去执行。 RecursiveTask
是有返回值的,将任务执行完成之后结果整合进行返回。
package com.montos.lock; import java.util.ArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; public class CountTask extends RecursiveTask<Long> { private static final long serialVersionUID = 1L; private static final int THRESHOLD = 10000; private long start; private long end; public CountTask(long start, long end) { super(); this.start = start; this.end = end; } @Override protected Long compute() { long sum = 0; boolean canCompute = (end - start) < THRESHOLD;//阈值判断 if (canCompute) { for (long i = start; i <= end; i++) { sum += i; } } else { long step = (start + end) / 100; ArrayList<CountTask> subTasks = new ArrayList<CountTask>(); long pos = start; for (int i = 0; i < 100; i++) { long lastOne = pos + step; if (lastOne > end) lastOne = end; CountTask subTask = new CountTask(pos, lastOne); pos += step + 1; subTasks.add(subTask); subTask.fork();//子线程进行求解 } for (CountTask t : subTasks) { sum += t.join();//返回所有的结果集进行求和返回 } } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask task = new CountTask(0, 200000l); ForkJoinTask<Long> result = forkJoinPool.submit(task); try { Long res = result.get(); System.out.println("result is :" + res); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } } 复制代码
上面是有返回值的demo,执行完之后控制台会会出现 result is :20000100000
这个返回结果。
有些小伙伴可能会遇到两个错误(修改里面的参数值,而导致出现的问题,这里我就遇到了!!):
java.util.concurrent.ExecutionException:java.lang.StackOverflowError
。原因是因为 ForkJoin
不会对堆栈进行控制,编写代码时注意方法递归不能超过jvm的内存,如果必要需要调整jvm的内存:在Eclipse中JDK的配置中加上 -XX:MaxDirectMemorySize=128
(默认是64M)。改为128后不报栈溢出,但是报下一个错。 java.lang.NoClassDefFoundError: Could not initialize class java.util.concurrent.locks.AbstractQueuedSynchronizer$Node
。这个导致的原因是因为子任务的处理长度不平衡。我们需要对原来的长度进行计算处理。 至此 JDK
中大部分的并发类都谈及到用法,对于底层代码的描述和处理,这块期待我之后的文章。