并行计算涉及将问题划分为子问题,同时解决这些问题(并行地,每个子问题运行在一个单独的线程中),然后将子问题的解决结果组合起来。Java SE提供了fork/join框架,它使你能够更容易地在应用程序中实现并行计算,但是,使用这个框架,你必须指定问题如何被细分(分区),使用聚合操作,Java运行时将为你执行这种分区和组合解决方案。
在使用集合的应用程序中实现并行性的一个困难是集合不是线程安全的,这意味着多个线程不能在不引入线程干扰或内存一致性错误的情况下操作集合, Collections
框架提供同步包装器,可以将自动同步添加到任意集合,使其线程安全。但是,同步会引入线程争用,你希望避免线程争用,因为这会阻止线程并行运行。聚合操作和并行流使你能够使用非线程安全的集合实现并行性,前提是在操作集合时不修改集合。
请注意,并行性并不会自动比串行执行操作快,尽管如果你有足够的数据和处理器内核,并行性可以更快。虽然聚合操作使你能够更容易地实现并行性,但是确定应用程序是否适合并行性仍然是你的职责。
你可以在示例 ParallelismExamples 中找到本节中描述的代码摘录。
你可以串行或并行执行流,当流并行执行时,Java运行时将流划分为多个子流,聚合操作迭代并并行处理这些子流,然后组合结果。
当你创建一个流时,它总是一个串行流,除非另有指定,要创建并行流,请调用操作 Collection.parallelStream ,或者,调用操作 BaseStream.parallel ,例如,下面的语句并行计算所有男性成员的平均年龄:
double average = roster .parallelStream() .filter(p -> p.getGender() == Person.Sex.MALE) .mapToInt(Person::getAge) .average() .getAsDouble();
再次考虑以下按性别对成员进行分组的示例(在小节归纳部分中进行了描述),这个例子调用了 collect
操作,它将集合 roster
归纳为 Map
:
Map<Person.Sex, List<Person>> byGender = roster .stream() .collect( Collectors.groupingBy(Person::getGender));
下面是等价的并行操作:
ConcurrentMap<Person.Sex, List<Person>> byGender = roster .parallelStream() .collect( Collectors.groupingByConcurrent(Person::getGender));
这称为并发归纳,如果以下所有条件对包含 collect
操作的特定管道都成立,Java运行时将执行并发归纳:
collect
操作的参数 collector
具有 Collector.Characteristics.CONCURRENT
特征,要确定 collector
的特征,请调用 Collector.characteristics
方法。 collector
具有 Collector.Characteristics.UNORDERED
特性,要确保流是无序的,请调用 BaseStream.unordered
操作。
注意:这个示例返回 ConcurrentMap
而不是 Map
的实例,并调用 groupingByConcurrent
操作而不是 groupingBy
,与 groupingByConcurrent
操作不同,并行流的 groupingBy
操作执行得很差(这是因为它通过按键合并两个映射来操作,这在计算上非常昂贵),类似地, Collectors.toConcurrentMap
操作在并行流中比 Collectors.toMap
操作执行得更好。
管道处理流元素的顺序取决于流是串行执行还是并行执行、流的源和中间操作,例如,考虑下面的例子,它使用 forEach
操作多次打印 ArrayList
实例的元素:
Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8 }; List<Integer> listOfIntegers = new ArrayList<>(Arrays.asList(intArray)); System.out.println("listOfIntegers:"); listOfIntegers .stream() .forEach(e -> System.out.print(e + " ")); System.out.println(""); System.out.println("listOfIntegers sorted in reverse order:"); Comparator<Integer> normal = Integer::compare; Comparator<Integer> reversed = normal.reversed(); Collections.sort(listOfIntegers, reversed); listOfIntegers .stream() .forEach(e -> System.out.print(e + " ")); System.out.println(""); System.out.println("Parallel stream"); listOfIntegers .parallelStream() .forEach(e -> System.out.print(e + " ")); System.out.println(""); System.out.println("Another parallel stream:"); listOfIntegers .parallelStream() .forEach(e -> System.out.print(e + " ")); System.out.println(""); System.out.println("With forEachOrdered:"); listOfIntegers .parallelStream() .forEachOrdered(e -> System.out.print(e + " ")); System.out.println("");
这个例子由五个管道组成,打印输出如下:
listOfIntegers: 1 2 3 4 5 6 7 8 listOfIntegers sorted in reverse order: 8 7 6 5 4 3 2 1 Parallel stream: 3 4 1 6 2 5 7 8 Another parallel stream: 6 3 1 5 7 8 4 2 With forEachOrdered: 8 7 6 5 4 3 2 1
这个示例执行以下操作:
listOfIntegers
的元素。 listOfIntegers
的元素。 forEachOrdered
的并行流操作,可能会失去并行性的好处。
方法或表达式除了返回或生成值外,如果还修改计算机的状态,则会产生副作用。例如可变归纳,以及调用 System.out.println
方法用于调试。JDK可以很好地处理管道中的某些副作用,特别地, collect
方法被设计用于以并行安全的方式执行最常见的流操作,这些操作具有副作用,像 forEach
和 peek
这样的操作是为副作用而设计的,返回 void
的lambda表达式,例如调用 System.out.println
的表达式,除了副作用什么都做不了。即便如此,你也应该小心使用 forEach
和 peek
操作,如果你将这些操作中的一个操作与并行流一起使用,那么Java运行时可能会从多个线程并发地调用你指定为其参数的lambda表达式。此外,永远不要作为参数传递lambda表达式,这些表达式在 filter
和 map
等操作中有副作用,下面几节讨论干扰和有状态lambda表达式,它们都可能是副作用的来源,并且可能返回不一致或不可预测的结果,特别是在并行流中。然而,惰性的概念首先被讨论,因为它对干扰有直接的影响。
所有中间操作都是惰性的,如果表达式、方法或算法的值只在需要时计算,那么它就是惰性的(如果一个算法被立即计算或处理,那么它就是立即的),中间操作是惰性的,因为它们直到终端操作开始时才开始处理流的内容。延迟处理流使Java编译器和运行时能够优化它们处理流的方式,例如,在管道中,如聚合操作一节中描述的 filter-mapToInt-average
示例, average
操作可以从 mapToInt
操作创建的流中获取几个整数, mapToInt
操作从 filter
操作中获取元素。 average
操作将重复这个过程,直到从流中获得所有需要的元素,然后计算平均值。
流操作中的Lambda表达式不应该干涉,当管道处理流时修改流的源时发生干扰,例如,下面的代码尝试连接列表 listofstring
中包含的字符串,但是,它抛出一个 ConcurrentModificationException
:
try { List<String> listOfStrings = new ArrayList<>(Arrays.asList("one", "two")); // This will fail as the peek operation will attempt to add the // string "three" to the source after the terminal operation has // commenced. String concatenatedString = listOfStrings .stream() // Don't do this! Interference occurs here. .peek(s -> listOfStrings.add("three")) .reduce((a, b) -> a + " " + b) .get(); System.out.println("Concatenated string: " + concatenatedString); } catch (Exception e) { System.out.println("Exception caught: " + e.toString()); }
这个示例使用 reduce
操作将 listofstring
中包含的字符串连接到一个可选的 String
值中, reduce
操作是一个终端操作,但是,这里的管道调用中间操作 peek
,该操作试图向 listofstring
添加一个新元素。记住,所有中间操作都是惰性的,这意味着本例中的管道在调用操作 get
时开始执行,在执行 get
操作完成时结束执行, peek
操作的参数试图在管道执行期间修改流源,这将导致Java运行时抛出 ConcurrentModificationException
。
避免在流操作中使用有状态lambda表达式作为参数,有状态lambda表达式的结果取决于在管道执行期间可能发生变化的任何状态,下面的示例使用 map
中间操作将列表 listOfIntegers
中的元素添加到一个新的 List
实例中,它这样做了两次,第一次是串行流,然后是并行流:
List<Integer> serialStorage = new ArrayList<>(); System.out.println("Serial stream:"); listOfIntegers .stream() // Don't do this! It uses a stateful lambda expression. .map(e -> { serialStorage.add(e); return e; }) .forEachOrdered(e -> System.out.print(e + " ")); System.out.println(""); serialStorage .stream() .forEachOrdered(e -> System.out.print(e + " ")); System.out.println(""); System.out.println("Parallel stream:"); List<Integer> parallelStorage = Collections.synchronizedList( new ArrayList<>()); listOfIntegers .parallelStream() // Don't do this! It uses a stateful lambda expression. .map(e -> { parallelStorage.add(e); return e; }) .forEachOrdered(e -> System.out.print(e + " ")); System.out.println(""); parallelStorage .stream() .forEachOrdered(e -> System.out.print(e + " ")); System.out.println("");
lambda表达式 e -> { parallelStorage.add(e); return e; }
是一个有状态的lambda表达式,每次运行代码时,它的结果都可能不同,这个例子打印了以下内容:
Serial stream: 8 7 6 5 4 3 2 1 8 7 6 5 4 3 2 1 Parallel stream: 8 7 6 5 4 3 2 1 1 3 6 2 4 5 8 7
forEachOrdered
操作按照流指定的顺序处理元素,无论流是串行执行还是并行执行,但是,当并行执行流时, map
操作处理Java运行时和编译器指定的流的元素,因此,lambda表达式 e -> { parallelStorage.add(e); return e; }
每次运行代码时,向列表中添加元素的 parallelStorage
都会发生变化,对于确定性和可预测的结果,确保流操作中的lambda表达式参数不是有状态的。
注意:这个例子调用了 synchronizedList
方法,因此列表 parallelStorage
是线程安全的,记住集合不是线程安全的,这意味着多个线程不应该同时访问特定的集合,假设你在创建 parallelStorage
时没有调用 synchronizedList
方法:
List<Integer> parallelStorage = new ArrayList<>();
这个例子的行为是不规律的,因为多线程访问和修改 parallelStorage
时,没有同步之类的机制来调度特定线程何时可以访问 List
实例,因此,该示例可以打印如下输出:
Parallel stream: 8 7 6 5 4 3 2 1 null 3 5 4 7 8 1 2
上一篇:聚合归纳操作