在前面三章中,我们已经看到了新的 Stream 接口可以让你以声明性方式处理数据集。我们还解释了将外部迭代换为内部迭代能够让原生Java库控制流元素的处理。这种方法让Java程序员无需显式实现优化来为数据集的处理加速。到目前为止,最重要的好处是可以对这些集合执行操作流水线,能够自动利用计算机上的多个内核。
例如,在Java 7之前,并行处理数据集合非常麻烦。第一,你得明确地把包含数据的数据结构分成若干子部分。第二,你要给每个子部分分配一个独立的线程。第三,你需要在恰当的时候对它们进行同步来避免不希望出现的竞争条件,等待所有线程完成,最后把这些部分结果合并起来。Java 7引入了一个叫作分支/合并的框架,让这些操作更稳定、更不易出错。
在本章中,我们将了解 Stream 接口如何让你不用太费力气就能对数据集执行并行操作。它允许你声明性地将顺序流变为并行流。此外,你将看到Java是如何变戏法的,或者更实际地来说,流是如何在幕后应用Java 7引入的分支/合并框架的。你还会发现,了解并行流内部是如何工作的很重要,因为如果你忽视这一方面,就可能因误用而得到意外的(很可能是错的)结果。
我们会特别演示,在并行处理数据块之前,并行流被划分为数据块的方式在某些情况下恰恰是这些错误且无法解释的结果的根源。因此,我们将会学习如何通过实现和使用你自己的Spliterator 来控制这个划分过程。
在第4章的笔记中,我们简要地了解到了 Stream 接口可以让你非常方便地处理它的元素:可以通过对收集源调用 parallelStream 方法来把集合转换为并行流。并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。这样一来,你就可以自动把给定操作的工作负荷分配给多核处理器的所有内核,让它们都忙起来。让我们用一个简单的例子来试验一下这个思想。
假设你需要写一个方法,接受数字n作为参数,并返回从1到给定参数的所有数字的和。一个直接(也许有点土)的方法是生成一个无穷大的数字流,把它限制到给定的数目,然后用对两个数字求和的 BinaryOperator 来归约这个流,如下所示:
public static long sequentialSum(long n) { // 生成自然数无限流 return Stream.iterate(1L, i -> i + 1) // 限制到前n个数 .limit(n) // 对所有数字求和来归纳流 .reduce(0L, Long::sum); } 复制代码
用更为传统的Java术语来说,这段代码与下面的迭代等价:
public static long iterativeSum(long n) { long result = 0; for (long i = 0; i <= n; i++) { result += i; } return result; } 复制代码
这似乎是利用并行处理的好机会,特别是n很大的时候。那怎么入手呢?你要对结果变量进行同步吗?用多少个线程呢?谁负责生成数呢?谁来做加法呢?根本用不着担心啦。用并行流的话,这问题就简单多了!
我们可以把流转换成并行流,从而让前面的函数归约过程(也就是求和)并行运行——对顺序流调用 parallel 方法:
public static long parallelSum(long n) { // 生成自然数无限流 return Stream.iterate(1L, i -> i + 1) // 限制到前n个数 .limit(n) // 将流转为并行流 .parallel() // 对所有数字求和来归纳流 .reduce(0L, Long::sum); } 复制代码
并行流的执行过程:
请注意,在现实中,对顺序流调用 parallel 方法并不意味着流本身有任何实际的变化。它在内部实际上就是设了一个 boolean 标志,表示你想让调用 parallel 之后进行的所有操作都并行执行。类似地,你只需要对并行流调用 sequential 方法就可以把它变成顺序流。请注意,你可能以为把这两个方法结合起来,就可以更细化地控制在遍历流时哪些操作要并行执行,哪些要顺序执行。例如,你可以这样做:
stream.parallel() .filter(...) .sequential() .map(...) .parallel() .reduce(); 复制代码
但最后一次 parallel 或 sequential 调用会影响整个流水线。在本例中,流水线会并行执行,因为最后调用的是它。
回到我们的数字求和练习,我们说过,在多核处理器上运行并行版本时,会有显著的性能提升。现在你有三个方法,用三种不同的方式(迭代式、顺序归纳和并行归纳)做完全相同的操作,让我们看看谁最快吧!
我们声称并行求和方法应该比顺序和迭代方法性能好。然而在软件工程上,靠猜绝对不是什么好办法!特别是在优化性能时,你应该始终遵循三个黄金规则:测量,测量,再测量。
public static long measurePerf(Function<Long, Long> adder, long n) { long fastest = Long.MAX_VALUE; for (int i = 0; i < 10; i++) { long start = System.nanoTime(); long sum = adder.apply(n); long duration = (System.nanoTime() - start) / 1_000_000; System.out.println("Result: " + sum); if (duration < fastest) { fastest = duration; } } return fastest; } 复制代码
这个方法接受一个函数和一个 long 作为参数。它会对传给方法的 long 应用函数10次,记录每次执行的时间(以毫秒为单位),并返回最短的一次执行时间。假设你把先前开发的所有方法都放进了一个名为 ParallelStreams 的类,你就可以用这个框架来测试顺序加法器函数对前一千万个自然数求和要用多久:
System.out.println("Sequential sum done in:" + measurePerf(ParallelStreams::sequentialSum, 10_000_000) + " msecs"); 复制代码
请注意,我们对这个结果应持保留态度。影响执行时间的因素有很多,比如你的电脑支持多少个内核。你可以在自己的机器上跑一下这些代码。在一台i5 6200U 的笔记本上运行它,输出是这样的:
Sequential sum done in:110 msecs 复制代码
用传统 for 循环的迭代版本执行起来应该会快很多,因为它更为底层,更重要的是不需要对原始类型做任何装箱或拆箱操作。如果你试着测量它的性能:
System.out.println("Iterative sum done in:" + measurePerf(ParallelStreams::iterativeSum, 10_000_000) + " msecs"); 复制代码
将得到:
Iterative sum done in:4 msecs 复制代码
现在我们来对函数的并行版本做测试:
System.out.println("Parallel sum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000) + " msecs"); 复制代码
看看会出现什么情况:
Parallel sum done in: 525 msecs 复制代码
这相当令人失望,求和方法的并行版本比顺序版本要慢很多。你如何解释这个意外的结果呢?这里实际上有两个问题:
第二个问题更有意思一点,因为你必须意识到某些流操作比其他操作更容易并行化。具体来说, iterate 很难分割成能够独立执行的小块,因为每次应用这个函数都要依赖前一次应用的结果。
这意味着,在这个特定情况下,归纳进程不是像上图那样进行的;整张数字列表在归纳过程开始时没有准备好,因而无法有效地把流划分为小块来并行处理。把流标记成并行,你其实是给顺序处理增加了开销,它还要把每次求和操作分到一个不同的线程上。
这就说明了并行编程可能很复杂,有时候甚至有点违反直觉。如果用得不对(比如采用了一个不易并行化的操作,如 iterate ),它甚至可能让程序的整体性能更差,所以在调用那个看似神奇的 parallel 操作时,了解背后到底发生了什么是很有必要的。
使用更有针对性的方法
那到底要怎么利用多核处理器,用流来高效地并行求和呢?我们在第5章中讨论了一个叫LongStream.rangeClosed 的方法。这个方法与 iterate 相比有两个优点。
让我们先看一下它用于顺序流时的性能如何,看看拆箱的开销到底要不要紧:
public static long rangedSum(long n) { return LongStream.rangeClosed(1, n) .reduce(0L, Long::sum); } 复制代码
这一次的输出是:
Ranged sum done in: 5 msecs 复制代码
这个数值流比前面那个用 iterate 工厂方法生成数字的顺序执行版本要快得多,因为数值流避免了非针对性流那些没必要的自动装箱和拆箱操作。由此可见,选择适当的数据结构往往比并行化算法更重要。但要是对这个新版本应用并行流呢?
public static long parallelRangedSum(long n) { return LongStream.rangeClosed(1, n) .parallel() .reduce(0L, Long::sum); } 复制代码
现在把这个函数传给的测试方法:
System.out.println("Parallel range sum done in:" + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000) + " msecs"); 复制代码
你会得到:
Parallel range sum done in:2 msecs 复制代码
amazing!终于,我们得到了一个比顺序执行更快的并行归纳,因为这一次归纳操作可以像并行流执行图那样执行了。这也表明,使用正确的数据结构然后使其并行工作能够保证最佳的性能。
尽管如此,请记住,并行化并不是没有代价的。并行化过程本身需要对流做递归划分,把每个子流的归纳操作分配到不同的线程,然后把这些操作的结果合并成一个值。但在多个内核之间移动数据的代价也可能比你想的要大,所以很重要的一点是要保证在内核中并行执行工作的时间比在内核之间传输数据的时间长。总而言之,很多情况下不可能或不方便并行化。然而,在使用并行 Stream 加速代码之前,你必须确保用得对;如果结果错了,算得快就毫无意义了。让我们来看一个常见的陷阱。
错用并行流而产生错误的首要原因,就是使用的算法改变了某些共享状态。下面是另一种实现对前n个自然数求和的方法,但这会改变一个共享累加器:
public static long sideEffectSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n) .forEach(accumulator::add); return accumulator.total; } public static class Accumulator { private long total = 0; public void add(long value) { total += value; } } 复制代码
这种代码非常普遍,特别是对那些熟悉指令式编程范式的程序员来说。这段代码和你习惯的那种指令式迭代数字列表的方式很像:初始化一个累加器,一个个遍历列表中的元素,把它们和累加器相加。
那这种代码又有什么问题呢?不幸的是,它真的无可救药,因为它在本质上就是顺序的。每次访问 total 都会出现数据竞争。如果你尝试用同步来修复,那就完全失去并行的意义了。为了说明这一点,让我们试着把 Stream 变成并行的:
public static long sideEffectParallelSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n) .parallel() .forEach(accumulator::add); return accumulator.total; } 复制代码
执行测试方法,并打印每次执行的结果:
System.out.println("SideEffect parallel sum done in: " + measurePerf(ParallelStreams::sideEffectParallelSum, 10_000_000L) + " msecs"); 复制代码
你可能会得到类似于下面这种输出:
Result: 9869563545574 Result: 12405006536090 Result: 8268141260766 Result: 11208597038187 Result: 12358062322272 Result: 19218969315182 Result: 11255083226412 Result: 25746147125980 Result: 13327069088874 SideEffect parallel sum done in: 4 msecs 复制代码
这回方法的性能无关紧要了,唯一要紧的是每次执行都会返回不同的结果,都离正确值50000005000000 差很远。这是由于多个线程在同时访问累加器,执行 total += value ,而这一句虽然看似简单,却不是一个原子操作。问题的根源在于, forEach 中调用的方法有副作用,它会改变多个线程共享的对象的可变状态。要是你想用并行 Stream 又不想引发类似的意外,就必须避免这种情况。
现在你知道了,共享可变状态会影响并行流以及并行计算。现在,记住要避免共享可变状态,确保并行 Stream 得到正确的结果。接下来,我们会看到一些实用建议,你可以由此判断什么时候可以利用并行流来提升性能。
一般而言,想给出任何关于什么时候该用并行流的定量建议都是不可能也毫无意义的,因为任何类似于“仅当至少有一千个(或一百万个或随便什么数字)元素的时候才用并行流)”的建议对于某台特定机器上的某个特定操作可能是对的,但在略有差异的另一种情况下可能就是大错特错。尽管如此,我们至少可以提出一些定性意见,帮你决定某个特定情况下是否有必要使用并行流。
最后,我们还要强调并行流背后使用的基础架构是Java 7中引入的分支/合并框架。并行汇总的示例证明了要想正确使用并行流,了解它的内部原理至关重要,所以我们会在下一节仔细研究分支/合并框架。
分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。它是 ExecutorService 接口的一个实现,它把子任务分配给线程池(称为 ForkJoinPool )中的工作线程。首先来看看如何定义任务和子任务。
要把任务提交到这个池,必须创建 RecursiveTask 的一个子类,其中 R 是并行化任务(以及所有子任务)产生的结果类型,或者如果任务不返回结果,则是 RecursiveAction 类型(当然它可能会更新其他非局部机构)。要定义 RecursiveTask, 只需实现它唯一的抽象方法compute :
protected abstract R compute(); 复制代码
这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。正由于此,这个方法的实现类似于下面的伪代码:
if (任务足够小或不可分) { 顺序计算该任务 } else { 将任务分成两个子任务 递归调用本方法,拆分每个子任务,等待所有子任务完成 合并每个子任务的结果 } 复制代码
一般来说并没有确切的标准决定一个任务是否应该再拆分,但有几种试探方法可以帮助你做出这一决定。
你可能已经注意到,这只不过是著名的分治算法的并行版本而已。这里举一个用分支/合并框架的实际例子,还以前面的例子为基础,让我们试着用这个框架为一个数字范围(这里用一个long[] 数组表示)求和。如前所述,你需要先为RecursiveTask类做一个实现,就是下面代码清单中的ForkJoinSumCalculator 。
public class ForkJoinSumCalculator extends RecursiveTask<Long> { /** * 不再将任务分解为子任务的数组大小 */ public static final long THRESHOLD = 10_000; /** * 要求和的数组 */ private final long[] numbers; /** * 子任务处理的数组的起始和终止位置 */ private final int start; private final int end; public ForkJoinSumCalculator(long[] numbers) { this(numbers, 0, numbers.length); } private ForkJoinSumCalculator(long[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } @Override protected Long compute() { // 该任务负责求和的部分的大小 int length = end - start; // 如果大小小于或等于阈值,顺序计算结果 if (length <= THRESHOLD) { return computeSequentially(); } // 创建一个子任务来为数组的前一半求和 ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2); leftTask.fork(); // 利用另一个ForkJoinPool线程异步执行新创建的子任务 ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end); // 同步执行第二个子任务,有可能允许进一步递归划分 Long rightResult = rightTask.compute(); // 读取第一个子任务的结果,如果尚未完成就等待 Long leftResult = leftTask.join(); // 该任务的结果是两个子任务结果的组合 return leftResult + rightResult; } private Long computeSequentially() { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } } 复制代码
现在编写一个方法来并行对前n个自然数求和就很简单了。你只需把想要的数字数组传给ForkJoinSumCalculator 的构造函数:
public static long forkJoinSum(long n) { long[] numbers = LongStream.rangeClosed(1, n).toArray(); ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers); return new ForkJoinPool().invoke(task); } 复制代码
这里用了一个 LongStream 来生成包含前n个自然数的数组,然后创建一个 ForkJoinTask( RecursiveTask 的父类),并把数组传递 ForkJoinSumCalculator 的公共构造函数。最后,你创建了一个新的 ForkJoinPool ,并把任务传给它的调用方法 。在ForkJoinPool 中执行时,最后一个方法返回的值就是 ForkJoinSumCalculator 类定义的任务结果。
请注意在实际应用时,使用多个 ForkJoinPool 是没有什么意义的。正是出于这个原因,一般来说把它实例化一次,然后把实例保存在静态字段中,使之成为单例,这样就可以在软件中任何部分方便地重用了。这里创建时用了其默认的无参数构造函数,这意味着想让线程池使用JVM能够使用的所有处理器。更确切地说,该构造函数将使用 Runtime.availableProcessors 的返回值来决定线程池使用的线程数。请注意 availableProcessors 方法虽然看起来是处理器,但它实际上返回的是可用内核的数量,包括超线程生成的虚拟内核。
当把 ForkJoinSumCalculator 任务传给 ForkJoinPool 时,这个任务就由池中的一个线程执行,这个线程会调用任务的 compute 方法。该方法会检查任务是否小到足以顺序执行,如果不够小则会把要求和的数组分成两半,分给两个新的 ForkJoinSumCalculator ,而它们也由ForkJoinPool 安排执行。因此,这一过程可以递归重复,把原任务分为更小的任务,直到满足不方便或不可能再进一步拆分的条件(本例中是求和的项目数小于等于10 000)。这时会顺序计算每个任务的结果,然后由分支过程创建的(隐含的)任务二叉树遍历回到它的根。接下来会合并每个子任务的部分结果,从而得到总任务的结果。
你可以再用一次本章开始时写的测试框架,来看看显式使用分支/合并框架的求和方法的性能:
System.out.println("ForkJoin sum done in: " + measurePerf( ForkJoinSumCalculator::forkJoinSum, 10_000_000) + " msecs"); 复制代码
它生成以下输出:
ForkJoin sum done in: 41 msecs 复制代码
这个性能看起来比用并行流的版本要差,但这只是因为必须先要把整个数字流都放进一个long[] ,之后才能在 ForkJoinSumCalculator 任务中使用它。
虽然分支/合并框架还算简单易用,不幸的是它也很容易被误用。以下是几个有效使用它的最佳做法。
对于分支/合并拆分策略还有最后一点补充:你必须选择一个标准,来决定任务是要进一步拆分还是已小到可以顺序求值。
在 ForkJoinSumCalculator 的例子中,我们决定在要求和的数组中最多包含10 000个项目时就不再创建子任务了。这个选择是很随意的,但大多数情况下也很难找到一个好的启发式方法来确定它,只能试几个不同的值来尝试优化它。在我们的测试案例中,我们先用了一个有1000万项目的数组,意味着 ForkJoinSumCalculator 至少会分出1000个子任务来。这似乎有点浪费资源,因为我们用来运行它的机器上只有四个内核。在这个特定例子中可能确实是这样,因为所有的任务都受CPU约束,预计所花的时间也差不多。
但分出大量的小任务一般来说都是一个好的选择。这是因为,理想情况下,划分并行任务时,应该让每个任务都用完全相同的时间完成,让所有的CPU内核都同样繁忙。不幸的是,实际中,每个子任务所花的时间可能天差地别,要么是因为划分策略效率低,要么是有不可预知的原因,比如磁盘访问慢,或是需要和外部服务协调执行。
分支/合并框架工程用一种称为工作窃取(work stealing)的技术来解决这个问题。在实际应用中,这意味着这些任务差不多被平均分配到 ForkJoinPool 中的所有线程上。每个线程都为分配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执行。基于前面所述的原因,某个线程可能早早完成了分配给它的所有任务,也就是它的队列已经空了,而其他的线程还很忙。这时,这个线程并没有闲下来,而是随机选了一个别的线程,从队列的尾巴上“偷走”一个任务。这个过程一直继续下去,直到所有的任务都执行完毕,所有的队列都清空。这就是为什么要划成许多小任务而不是少数几个大任务,这有助于更好地在工作线程之间平衡负载。
一般来说,这种工作窃取算法用于在池中的工作线程之间重新分配和平衡任务。当工作线程队列中有一个任务被分成两个子任务时,一个子任务就被闲置的工作线程“偷走”了。如前所述,这个过程可以不断递归,直到规定子任务应顺序执行的条件为真。
现在你应该清楚流如何使用分支/合并框架来并行处理它的项目了,不过还有一点没有讲。本节中我们分析了一个例子,你明确地指定了将数字数组拆分成多个任务的逻辑。但是,使用本章前面讲的并行流时就用不着这么做了,这就意味着,肯定有一种自动机制来为你拆分流。这种新的自动机制称为 Spliterator ,我们会在下一节中讨论。
Spliterator 是Java 8中加入的另一个新接口;这个名字代表“可分迭代器”(splitableiterator)。和 Iterator 一样, Spliterator 也用于遍历数据源中的元素,但它是为了并行执行而设计的。虽然在实践中可能用不着自己开发 Spliterator ,但了解一下它的实现方式会让你对并行流的工作原理有更深入的了解。Java 8已经为集合框架中包含的所有数据结构提供了一个默认的 Spliterator 实现。集合实现了 Spliterator 接口,接口提供了一个 spliterator 方法。这个接口定义了若干方法,如下面的代码清单所示。
public interface Spliterator<T> { boolean tryAdvance(Consumer<? super T> action); Spliterator<T> trySplit(); long estimateSize(); int characteristics(); } 复制代码
与往常一样, T 是 Spliterator 遍历的元素的类型。 tryAdvance 方法的行为类似于普通的Iterator ,因为它会按顺序一个一个使用 Spliterator 中的元素,并且如果还有其他元素要遍历就返回 true 。但 trySplit 是专为 Spliterator 接口设计的,因为它可以把一些元素划出去分给第二个 Spliterator (由该方法返回),让它们两个并行处理。 Spliterator 还可通过estimateSize 方法估计还剩下多少元素要遍历,因为即使不那么确切,能快速算出来是一个值也有助于让拆分均匀一点。
重要的是,要了解这个拆分过程在内部是如何执行的,以便在需要时能够掌控它。因此,我们会在下一节中详细地分析它。
将 Stream 拆分成多个部分的算法是一个递归过程。第一步是对第一个Spliterator 调用 trySplit ,生成第二个 Spliterator 。第二步对这两个 Spliterator 调用trysplit ,这样总共就有了四个 Spliterator 。这个框架不断对 Spliterator 调用 trySplit直到它返回 null ,表明它处理的数据结构不能再分割,如第三步所示。最后,这个递归拆分过程到第四步就终止了,这时所有的 Spliterator 在调用 trySplit 时都返回了 null 。
这个拆分过程也受 Spliterator 本身的特性影响,而特性是通过 characteristics 方法声明的。
让我们来看一个可能需要你自己实现 Spliterator 的实际例子。我们要开发一个简单的方法来数数一个 String 中的单词数。这个方法的一个迭代版本可以写成下面的样子。
public static int countWordsIteratively(String s) { int counter = 0; boolean lastSpace = true; for (char c : s.toCharArray()) { if (Character.isWhitespace(c)) { lastSpace = true; } else { if (lastSpace) { counter++; } lastSpace = Character.isWhitespace(c); } } return counter; } 复制代码
让我们把这个方法用在但丁的《神曲》的《地狱篇》的第一句话上:
public static final String SENTENCE = " Nel mezzo del cammin di nostra vita " + "mi ritrovai in una selva oscura" + " che la dritta via era smarrita "; System.out.println("Found " + countWordsIteratively(SENTENCE) + " words"); 复制代码
请注意,我们在句子里添加了一些额外的随机空格,以演示这个迭代实现即使在两个词之间存在多个空格时也能正常工作。正如我们所料,这段代码将打印以下内容:
Found 19 words 复制代码
理想情况下,你会想要用更为函数式的风格来实现它,因为就像我们前面说过的,这样你就可以用并行 Stream 来并行化这个过程,而无需显式地处理线程和同步问题。
首先你需要把 String 转换成一个流。不幸的是,原始类型的流仅限于 int 、 long 和 double , 所以你只能用 Stream :
Stream<Character> stream = IntStream.range(0, SENTENCE.length()) .mapToObj(SENTENCE::charAt); 复制代码
你可以对这个流做归约来计算字数。在归约流时,你得保留由两个变量组成的状态:一个 int用来计算到目前为止数过的字数,还有一个 boolean 用来记得上一个遇到的 Character 是不是空格。因为Java没有元组(tuple,用来表示由异类元素组成的有序列表的结构,不需要包装对象),所以你必须创建一个新类 WordCounter 来把这个状态封装起来,如下所示。
private static class WordCounter { private final int counter; private final boolean lastSpace; public WordCounter(int counter, boolean lastSpace) { this.counter = counter; this.lastSpace = lastSpace; } public WordCounter accumulate(Character c) { if (Character.isWhitespace(c)) { return lastSpace ? this : new WordCounter(counter, true); } else { return lastSpace ? new WordCounter(counter + 1, false) : this; } } public WordCounter combine(WordCounter wordCounter) { return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace); } public int getCounter() { return counter; } } 复制代码
在这个列表中, accumulate 方法定义了如何更改 WordCounter 的状态,或更确切地说是用哪个状态来建立新的 WordCounter ,因为这个类是不可变的。每次遍历到 Stream 中的一个新的Character 时,就会调用 accumulate 方法。具体来说,就像 countWordsIteratively 方法一样,当上一个字符是空格,新字符不是空格时,计数器就加一。
调用第二个方法 combine 时,会对作用于 Character 流的两个不同子部分的两个WordCounter 的部分结果进行汇总,也就是把两个 WordCounter 内部的计数器加起来。
private static int countWords(Stream<Character> stream) { WordCounter wordCounter = stream.reduce(new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine); return wordCounter.getCounter(); } 复制代码
现在你就可以试一试这个方法,给它由包含但丁的《神曲》中《地狱篇》第一句的 String创建的流:
Stream<Character> stream = IntStream.range(0, SENTENCE.length()) .mapToObj(SENTENCE::charAt); System.out.println("Found " + countWords(stream) + " words"); 复制代码
你可以和迭代版本比较一下输出:
Found 19 words 复制代码
到现在为止都很好,但我们以函数式实现 WordCounter 的主要原因之一就是能轻松地并行处理,让我们来看看具体是如何实现的。
你可以尝试用并行流来加快字数统计,如下所示:
System.out.println("Found " + countWords(stream.parallel()) + " words"); 复制代码
不幸的是,这次的输出是:
Found 25 words 复制代码
显然有什么不对,可到底是哪里不对呢?问题的根源并不难找。因为原始的 String 在任意位置拆分,所以有时一个词会被分为两个词,然后数了两次。这就说明,拆分流会影响结果,而把顺序流换成并行流就可能使结果出错。
如何解决这个问题呢?解决方案就是要确保 String 不是在随机位置拆开的,而只能在词尾拆开。要做到这一点,你必须为 Character 实现一个 Spliterator ,它只能在两个词之间拆开String (如下所示),然后由此创建并行流。
private static class WordCounterSpliterator implements Spliterator<Character> { private final String string; private int currentChar = 0; public WordCounterSpliterator(String string) { this.string = string; } @Override public boolean tryAdvance(Consumer<? super Character> action) { action.accept(string.charAt(currentChar++)); return currentChar < string.length(); } @Override public Spliterator<Character> trySplit() { int currentSize = string.length() - currentChar; if (currentSize < 10) { return null; } for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) { if (Character.isWhitespace(string.charAt(splitPos))) { Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos)); currentChar = splitPos; return spliterator; } } return null; } @Override public long estimateSize() { return string.length() - currentChar; } @Override public int characteristics() { return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE; } } 复制代码
这个 Spliterator 由要解析的 String 创建,并遍历了其中的 Character ,同时保存了当前正在遍历的字符位置。让我们快速回顾一下实现了Spliterator接口的WordCounterSpliterator 中的各个函数。
现在就可以用这个新的 WordCounterSpliterator 来处理并行流了,如下所示:
Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE); Stream<Character> stream = StreamSupport.stream(spliterator, true); 复制代码
传给 StreamSupport.stream 工厂方法的第二个布尔参数意味着你想创建一个并行流。把这个并行流传给 countWords 方法:
System.out.println("Found " + countWords(stream.parallel()) + " words"); 复制代码
可以得到意料之中的正确输出:
Found 19 words 复制代码
你已经看到了 Spliterator 如何让你控制拆分数据结构的策略。 Spliterator 还有最后一个值得注意的功能,就是可以在第一次遍历、第一次拆分或第一次查询估计大小时绑定元素的数据源,而不是在创建时就绑定。这种情况下,它称为延迟绑定(late-binding)的 Spliterator 。