谈到线程池,很多人会想到Executors提供的一些预设的线程池,比如单线程线程池 SingleThreadExecutor
,固定大小的线程池 FixedThreadPool
,但是很少有人会注意到其中还提供了一种特殊的线程池: WorkStealingPool
,我们点进这个方法,会看到和其他方法不同的是,这种线程池并不是通过 ThreadPoolExecutor
来创建的,而是 ForkJoinPool
来创建的:
public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } 复制代码
这两种线程池之间并不是继承关系,而是平级关系:
ThreadPoolExecutor应该都很了解了,就是一个基本的存储线程的线程池,需要执行任务的时候就从线程池中拿一个线程来执行。而ForkJoinPool则不仅仅是这么简单,同样也不是ThreadPoolExecutor的代替品,这种线程池是为了实现“ 分治法”这一思想而创建的,通过把大任务拆分成小任务,然后再把小任务的结果汇总起来就是最终的结果,和MapReduce的思想很类似
举个例子,我们要统计1-100的累加和,如果使用ForkJoinPool来实现的话,就可以将1-100每5位划分一段,划分出20段,当作20个任务,每个任务只计算自己区间内的结果,最后将这20个任务的结果汇总起来就是1-100的累加和
ForkJoinPool的本质就是两点:
接来下我们来做一个1-100的累加例子:
class Task extends RecursiveTask<Integer> { private int start; private int end; private int mid; public Task(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; if (end - start < 6) { // 当任务很小时,直接进行计算 for (int i = start; i <= end; i++) { sum += i; } System.out.println(Thread.currentThread().getName() + " count sum: " + sum); } else { // 否则,将任务进行拆分 mid = (end - start) / 2 + start; Task left = new Task(start, mid); Task right = new Task(mid + 1, end); // 执行上一步拆分的子任务 left.fork(); right.fork(); // 拿到子任务的执行结果 sum += left.join(); sum += right.join(); } return sum; } } 复制代码
这里的 RecursiveTask
是 ForkJoinTask
的子类, ForkJoinTask
又是 Future
的子类,不了解Future类的可以认为是一个异步执行,并且可以有返回值的Runnable类
我们首先在Task类中定义了任务需要的一些数据,比如开始位置和结束位置。重点是其中的compute方法,在其中实现了我们刚才说到的步骤,如果任务很小(通过任务数据来判断),就进行计算,否则将任务拆分,使用fork()执行,并通过join()拿到计算结果
刚才我们定义了任务类,接下来就需要把这个任务提交到线程池:
public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(); Task countTask = new Task(1, 100); ForkJoinTask<Integer> result = forkJoinPool.submit(countTask); System.out.println("result: " + result.get()); forkJoinPool.shutdown(); } 复制代码
注意,这里ForkJoinPool初始化可以传入一个并行参数,如果不传入该参数的话会默认使用处理器个数来作为并行参数
创建任务对象和线程池之后,使用submit方法来提交任务,该方法会返回一个 ForkJoinTask<T>
类型的对象,调用其get方法即可拿到执行结果
同时要注意,该线程池也需要调用shutdown方法来关闭
ForkJoinPool中有三个重要角色:
在线程池中,任务队列使用数组来保存,其中保存了所有提交进来的任务:
submission
指的是本地提交的任务,如submit、execute提交的任务;而 task
则是通过fork方法添加的子任务。这两种任务仅仅在含义上有所区别,所以一同保存在任务队列中,通过位置进行区分
想理解ForkJoinPool的原理,就要理解其核心,一共有两点,其一是 分治法 ,其二就是 工作窃取算法 。分治法相信就不用多说了,就是通过把大任务拆分成小任务来提高并发度。重点要说的就是工作窃取算法,该算法的原理:
所有线程均尝试找到并执行 已提交的任务 ,或是通过其他任务创建的 子任务
依赖于这种特性,来尽量避免一个线程执行完自己的任务后“无所事事”的情况。同时,窃取顺序是FIFO的