Spliterator是一个可分割迭代器(Splittable Iterator),JDK8发布后,对于并行处理的能力大大增强,Spliterator就是为了并行遍历&分割序列而设计的一个迭代器。本文对其进行分析。
在 Java7
中引入了 ForkJoinPool
框架使得并行计算变得非常方便,它可以利用多核加速计算,同时不必手工实现复杂的多线程程序,底层直接帮我们实现好。
对于Stream来说,并行流同样是一种快速计算集合数据的方式,它的底层就是依赖 ForkJoinPool
来实现的。
默认 Collection
接口实现了 parallelStream()
default Stream<E> parallelStream() { return StreamSupport.stream(spliterator(), true); }
并行流底层调用 StreamSupport
的工具方法 stream(Spliterator s, boolean parallel)
,其中 Collection
也提供了默认的 spliterator
方下:
default Spliterator<E> spliterator() { return Spliterators.spliterator(this, 0); }
对于子类集合来说应尽量覆盖这个方法以更高效的方式来分割迭代本集合中的数据序列:
The default implementation should be overridden by subclasses that can return a more efficient spliterator.
比如对于 ArrayList
来说,它实现了自己的分割迭代器 ArrayListSpliterator
:
// Creates a late-binding and fail-fast Spliterator over the elements in this list. public Spliterator<E> spliterator() { return new ArrayListSpliterator<>(this, 0, -1, 0); }
它有两个特点:
Late-Binding Fail-Fast
求最大值
int max = numbers.parallelStream().reduce(0, Integer::max, Integer::max); System.out.println("Parallel: " + max);
调用并行流的调用栈如下:
parallelStream()
StreamSupport.stream(spliterator(), true); ArrayList.spliterator() ArrayListSpliterator<>();
由 Spliterator
来实现分割逻辑,一直到无法继续分割,再对子结果集进行合并。
JDK8
中引入的新接口,可以实现对元素序列的遍历和分割,它可以实现并行流计算:
public interface Spliterator<T> { boolean tryAdvance(Consumer<T> action); default void forEachRemaining(Consumer<T> action); Spliterator<T> trySplit(); long estimateSize(); int characteristics(); }
对接口的介绍使用一篇英文博客,摘自 这里 ,这是我找到的最容易理解的解释,比很多中文冗长又不全面的理解强很多:
is a default method available in Spliterator interface that indicates the spliterator to take certain action when no more splitting require. Basically this performs the given action for each remaining element, sequentially in the current thread, until all elements have been processed.
default void forEachRemaining(Consumer<T> action) { do { } while (tryAdvance(action)); }
If you see the forEachRemaining method default implementation, it repeatedly calls the tryAdvance method to process the spliterator elements sequentially. While splitting task when a spliterator finds itself to be small enough that can be executed sequentially then it calls forEachRemaining method on its elements.
trySplit is used to partition off some of its elements to second spliterator allowing both of them to process parallelly. The idea behind this splitting is to allow balanced parallel computation on a data structure. These spliterators repeatedly calls trySplit method unless spliterator returns null indiacating end of splitting process.
estimateSize returns an estimate of the number of elements available in spliterator. Usually this method is called by some forkjoin tasks like AbstractTask to check size before calling trySplit.
这里的例子同样摘自上述博客,通过 Spliterator
实现并行计算整数元素序列的最大值。
public class SpliteratorTest { public static void main(String[] args) { Random random = new Random(100); int[] array = IntStream.rangeClosed(1, 1_000_000).map(random::nextInt) .map(i -> i * i + i).skip(20).toArray(); int max = StreamSupport.stream(new FindMaxSpliterator(array, 0, array.length - 1), true) .reduce(0, Integer::max, Integer::max); System.out.println(max); } private static class FindMaxSpliterator implements Spliterator<Integer> { int start, end; int[] arr; public FindMaxSpliterator(int[] arr, int start, int end) { this.arr = arr; this.start = start; this.end = end; } @Override public boolean tryAdvance(Consumer<? super Integer> action) { if (start <= end) { action.accept(arr[start]); start++; return true; } return false; } @Override public Spliterator<Integer> trySplit() { if (end - start < 1000) { return null; } int mid = (start + end) / 2; // 可能会溢出,取中间值 int oldstart = start; start = mid + 1; return new FindMaxSpliterator(arr, oldstart, mid); } @Override public long estimateSize() { return end - start; } @Override public int characteristics() { // 注意这里的IMMUTABLE return ORDERED | SIZED | IMMUTABLE | SUBSIZED; } // 注意没有覆盖forEachRemaining()方法,使用默认的实现,即串行消费所有剩余元素 } }
它的工作原理可以用下面的这张图来描述: !a
Parallel Stream
使用了 ForkJoinPool
和 Spliterator
的特性实现了并行计算,但是需要注意使用它的场景:只适用集合数据量大的场景,比如只有几千个元素的场景使用并行流的方式计算将小高更多的资源,因为分割和合并会消耗很大的资源,而串行计算则不需要。
本文首次发布于ElseF’s Blog, 作者 @stuartlau , 转载请保留原文链接.