在Java 7之前,并行处理数据集合非常麻烦。第一,你得明确地把包含数据的数据结构分成若干子部分。第二,你要给每个子部分分配一个独立的线程。第三,你需要在恰当的时候对它们进行同步来避免不希望出现的竞争条件,等待所有线程完成,最后把这些部分结果合并起来。Java 7引入了一个叫作分支/合并的框架,让这些操作更稳定、更不易出错。
Stream接口让你不用太费力气就能对数据集执行并行操作。它允许你声明性地将顺序流变为并行流。此外,你将看到Java是如何变戏法的,或者更实际地来说, 流是如何在幕后应用Java 7引入的分支/合并框架的。
并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。
public static long sequentialSum(long n) { return Stream.iterate(1L, i -> i + 1) .limit(n) .reduce(0L, Long::sum); } 传统写法: public static long iterativeSum(long n) { long result = 0; for (long i = 1L; i <= n; i++) { result += i; } return result; }复制代码
可以把流转换成并行流,从而让前面的函数归约过程(也就是求和)并行运行——对顺序流调用parallel方法:
public static long parallelSum(long n) { return Stream.iterate(1L, i -> i + 1) .limit(n) .parallel() .reduce(0L, Long::sum); }复制代码
在现实中,对顺序流调用parallel方法并不意味着流本身有任何实际的变化。它在内部实际上就是设了一个boolean标志,表示你想让调用parallel之后进行的所有操作都并行执行。类似地,你只需要对并行流调用sequential方法就可以把它变成顺序流。请注意,你可能以为把这两个方法结合起来,就可以更细化地控制在遍历流时哪些操作要并行执行,哪些要顺序执行。
配置并行流使用的线程池
看看流的parallel方法,你可能会想,并行流用的线程是从哪来的?有多少个?怎么自定义这个过程呢?
并行流内部使用了默认的ForkJoinPool,它默认的线程数量就是你的处理器数量,这个值是由Runtime.getRuntime().available- Processors()得到的。
但是你可以通过系统属性 java.util.concurrent.ForkJoinPool.common.parallelism来改变线程池大小,如下所示:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");
这是一个全局设置,因此它将影响代码中所有的并行流。反过来说,目前还无法专为某个并行流指定这个值。一般而言,让ForkJoinPool的大小等于处理器数量是个不错的默认值,
除非你有很好的理由,否则我们强烈建议你不要修改它。
并行编程可能很复杂,有时候甚至有点违反直觉。如果用得不对(比如采用了一 个不易并行化的操作,如iterate),它甚至可能让程序的整体性能更差,所以在调用那个看似神奇的parallel操作时,了解背后到底发生了什么是很有必要的。并行化并不是没有代价的。并行化过程本身需要对流做递归划分,把每个子流的归纳操作分配到不同的线程,然后把这些操作的结果合并成一个值。但在多个内核之间移动数据的代价也可能比你想的要大,所以很重要的一点是要保证在内核中并行执行工作的时间比在内核之间传输数据的时间长。总而言之,很多情况下不可能或不方便并行化。然而,在使用 并行Stream加速代码之前,你必须确保用得对;如果结果错了,算得快就毫无意义了。
错用并行流而产生错误的首要原因,就是使用的算法改变了某些共享状态。下面是另一种实现对前n个自然数求和的方法,但这会改变一个共享累加器:
public static long sideEffectSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).forEach(accumulator::add) return accumulator.total; } public class Accumulator { public long total = 0; public void add(long value) { total += value; } }复制代码
这段代码本身上就是顺序的,因为每次访问total都会出现数据竞争。接下来将这段代码改为并行:
public static long sideEffectParallelSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add); return accumulator.total;} System.out.println("SideEffect parallel sum done in: " + measurePerf(ParallelStreams::sideEffectParallelSum, 10_000_000L) +" msecs" ); Result: 5959989000692 Result: 7425264100768 Result: 6827235020033 Result: 7192970417739 Result: 6714157975331 Result: 7715125932481 SideEffect parallel sum done in: 49 msecs复制代码
这回方法的性能无关紧要了,唯一要紧的是每次执行都会返回不同的结果,都离正确值50000005000000差很远。这是由于多个线程在同时访问累加器,执行total += value,而这一句 然看似简单,却不是一个原子操作。问题的根源在于,forEach中调用的方法有副作用,它会改变多个线程共享的对象的可变状态。要是你想用并行Stream又不想引发类似的意外,就必须避免这种情况。现在你知道了,共享可变状态会影响并行流以及并行计算。
并行流背后使用的基础架构是Java 7中引入的分支/合并框架。
分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。它是ExecutorService接口的一个实现,它把子任务分配给线程池(称为ForkJoinPool)中的工作线程。
要把任务提交到这个池,必须创建RecursiveTask
protected abstract R compute();复制代码
这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。下图表示了递归任务的拆分过程:
让我们试着用这个框架为一个数字范围(这里用一个 long[]数组表示)求和。如前所述,你需要先为RecursiveTask类做一个实现,就是下面代码清单中的ForkJoinSumCalculator。
public class ForkJoinSumCalculator extends RecursiveTask<Long> { private final long[] numbers; private final int start; private final int end; public static final long THRESHOLD = 10_000; public ForkJoinSumCalculator(long[] numbers) { this(numbers, 0, numbers.length); } public ForkJoinSumCalculator(long[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } @Override protected Long compute() { int length = end - start; if (length <= THRESHOLD) { return computeSequentially(); } ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2); leftTask.fork(); ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end); Long rightResult = rightTask.compute(); Long leftResult = leftTask.join(); return leftResult + rightResult; } private long computeSequentially() { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } }复制代码
这里用了一个LongStream来生成包含前n个自然数的数组,然后创建一个ForkJoinTask (RecursiveTask的父类),并把数组传递给代码清单7-2所示ForkJoinSumCalculator的公共构造函数。最后,你创建了一个新的ForkJoinPool,并把任务传给它的调用方法 。在ForkJoinPool中执行时,最后一个方法返回的值就是ForkJoinSumCalculator类定义的任务结果。请注意在实际应用时,使用多个ForkJoinPool是没有什么意义的。正是出于这个原因,一般来说把它实例化一次,然后把实例保存在静态字段中,使之成为单例,这样就可以在软件中任何部分方便地重用了。这里创建时用了其默认的无参数构造函数,这意味着想让线程池使用JVM能够使用的所有处理器。更确切地说,该构造函数将使用Runtime.availableProcessors的返回值来决定线程 使用的线程数。请注意availableProcessors方法虽然看起来是处理器, 但它实际上返回的是可用内核的数量,包括超线程生成的虚拟内核。当把ForkJoinSumCalculator任务传给ForkJoinPool时,这个任务就由 中的一个线程 执行,这个线程会调用任务的compute方法。该方法会检查任务是否小到足以顺序执行,如果不够小则会把要求和的数组分成两半,分给两个新的ForkJoinSumCalculator,而它们也由ForkJoinPool安排执行。因此,这一过程可以递归重复,把原任务分为更小的任务,直到满足不方便或不可能再进一步拆分的条件(本例中是求和的项目数小于等于10000)。这时会顺序计算每个任务的结果,然后由分支过程创建的(隐含的)任务二叉树遍历回到它的根。接下来会合并每个子任务的部分结果,从而得到总任务的结果。这一过程如下图所示。
实际中,每个子任务所花的时间可能天差地别,要么是因为划分策略效率低,要么是有不可预知的原因,比如磁盘访问慢,或是需要和外部任务协调执行。分支/合并框架工程用一种称为工作窃取(work stealing)的技术来解决这个问题。在实际应用中,这意味着这些任务差不多被平均分配到ForkJoinPool中的所有线程上。每个线程都为分配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执行。基于前面所述的原因,某个线程可能早早完成了分配给它的所有任务,也就是它的队列已经空了,而其他的线程还很忙。这时,这个线程并没有闲下来,而是随机选了一个别的线程,从队列的尾巴上“偷走”一个任务。这个过程一直继续下去,直到所有的任务都执行完毕,所有的队列都清空。这就是为什么要划成许多小任务而不是少数几个大任务,这有助于更好地在工作线程之间平衡负载。一般来说,这种工作窃取算法用于在池中的工作线程之间重新分配和平衡任务。
Spliterator是Java 8中加入的另一个新接口;这个名字代表“可分迭代器”(splitable iterator)。和Iterator一样,Spliterator也用于遍历数据源中的元素,但它是为了并行执行而设计的。
public interface Spliterator<T> { boolean tryAdvance(Consumer<? super T> action); Spliterator<T> trySplit(); long estimateSize(); int characteristics(); }复制代码
与往常一样,T是Spliterator遍历的元素的类型。tryAdvance方法的行为类似于普通的 Iterator,因为它会按顺序一个一个使用Spliterator中的元素,并且如果还有其他元素要遍历就返回true。但trySplit是专为Spliterator接口设计的,因为它可以把一些元素划出去分给第二个Spliterator(由该方法返回),让它们两个并行处理。Spliterator还可通过 estimateSize方法估计还剩下多少元素要遍历,因为即使不那么确切,能快速算出来是一个值也有助于让拆分均匀一点。
将Stream拆分成多个部分的算法是一个递 过程,如图7-6所示。第一步是对第一个 Spliterator调用trySplit,生成第二个Spliterator。第二步对这两个Spliterator调用 trysplit,这样总共就有了四个Spliterator。这个框架不断对Spliterator调用trySplit直到它返回null,表明它处理的数据结构不能再分割,如第三步所示。最后,这个递归拆分过程到第四步就终止了,这时所有的Spliterator在调用trySplit时都返回了null。
Spliterator的特性 Spliterator接口声明的最后一个抽象方法是characteristics,它将返回一个int,代 表Spliterator本身特性集的编码。 使用Spliterator的客户可以用这些特性来更好地控制和优化它的使用。 表7-2总结了这些特性。(不幸的是,虽然它们在概念上与收集器的特性有重叠,编码却不一样。) ![](https://user-gold-cdn.xitu.io/2019/8/28/16cd56026a7081e8?w=1260&h=456&f=png&s=67011)复制代码
略
在本章中,你了解了以下内容。