转载

Java™ 教程(聚合并行性)

聚合并行性

并行计算涉及将问题划分为子问题,同时解决这些问题(并行地,每个子问题运行在一个单独的线程中),然后将子问题的解决结果组合起来。Java SE提供了fork/join框架,它使你能够更容易地在应用程序中实现并行计算,但是,使用这个框架,你必须指定问题如何被细分(分区),使用聚合操作,Java运行时将为你执行这种分区和组合解决方案。

在使用集合的应用程序中实现并行性的一个困难是集合不是线程安全的,这意味着多个线程不能在不引入线程干扰或内存一致性错误的情况下操作集合, Collections 框架提供同步包装器,可以将自动同步添加到任意集合,使其线程安全。但是,同步会引入线程争用,你希望避免线程争用,因为这会阻止线程并行运行。聚合操作和并行流使你能够使用非线程安全的集合实现并行性,前提是在操作集合时不修改集合。

请注意,并行性并不会自动比串行执行操作快,尽管如果你有足够的数据和处理器内核,并行性可以更快。虽然聚合操作使你能够更容易地实现并行性,但是确定应用程序是否适合并行性仍然是你的职责。

你可以在示例 ParallelismExamples 中找到本节中描述的代码摘录。

并行执行流

你可以串行或并行执行流,当流并行执行时,Java运行时将流划分为多个子流,聚合操作迭代并并行处理这些子流,然后组合结果。

当你创建一个流时,它总是一个串行流,除非另有指定,要创建并行流,请调用操作 Collection.parallelStream ,或者,调用操作 BaseStream.parallel ,例如,下面的语句并行计算所有男性成员的平均年龄:

double average = roster
    .parallelStream()
    .filter(p -> p.getGender() == Person.Sex.MALE)
    .mapToInt(Person::getAge)
    .average()
    .getAsDouble();

并发归纳

再次考虑以下按性别对成员进行分组的示例(在小节归纳部分中进行了描述),这个例子调用了 collect 操作,它将集合 roster 归纳为 Map

Map<Person.Sex, List<Person>> byGender =
    roster
        .stream()
        .collect(
            Collectors.groupingBy(Person::getGender));

下面是等价的并行操作:

ConcurrentMap<Person.Sex, List<Person>> byGender =
    roster
        .parallelStream()
        .collect(
            Collectors.groupingByConcurrent(Person::getGender));

这称为并发归纳,如果以下所有条件对包含 collect 操作的特定管道都成立,Java运行时将执行并发归纳:

  • 流是并行的。
  • collect 操作的参数 collector 具有 Collector.Characteristics.CONCURRENT 特征,要确定 collector 的特征,请调用 Collector.characteristics 方法。
  • 要么流是无序的,要么 collector 具有 Collector.Characteristics.UNORDERED 特性,要确保流是无序的,请调用 BaseStream.unordered 操作。

注意:这个示例返回 ConcurrentMap 而不是 Map 的实例,并调用 groupingByConcurrent 操作而不是 groupingBy ,与 groupingByConcurrent 操作不同,并行流的 groupingBy 操作执行得很差(这是因为它通过按键合并两个映射来操作,这在计算上非常昂贵),类似地, Collectors.toConcurrentMap 操作在并行流中比 Collectors.toMap 操作执行得更好。

排序

管道处理流元素的顺序取决于流是串行执行还是并行执行、流的源和中间操作,例如,考虑下面的例子,它使用 forEach 操作多次打印 ArrayList 实例的元素:

Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8 };
List<Integer> listOfIntegers =
    new ArrayList<>(Arrays.asList(intArray));

System.out.println("listOfIntegers:");
listOfIntegers
    .stream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");

System.out.println("listOfIntegers sorted in reverse order:");
Comparator<Integer> normal = Integer::compare;
Comparator<Integer> reversed = normal.reversed(); 
Collections.sort(listOfIntegers, reversed);  
listOfIntegers
    .stream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");
     
System.out.println("Parallel stream");
listOfIntegers
    .parallelStream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");
    
System.out.println("Another parallel stream:");
listOfIntegers
    .parallelStream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");
     
System.out.println("With forEachOrdered:");
listOfIntegers
    .parallelStream()
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");

这个例子由五个管道组成,打印输出如下:

listOfIntegers:
1 2 3 4 5 6 7 8
listOfIntegers sorted in reverse order:
8 7 6 5 4 3 2 1
Parallel stream:
3 4 1 6 2 5 7 8
Another parallel stream:
6 3 1 5 7 8 4 2
With forEachOrdered:
8 7 6 5 4 3 2 1

这个示例执行以下操作:

  • 第一个管道按添加到列表中的顺序打印列表 listOfIntegers 的元素。
  • 第二个管道按照方法 Collections.sort 排序后打印 listOfIntegers 的元素。
  • 第三和第四个管道以一种明显随机的顺序打印列表中的元素,请记住,当处理流的元素时,流操作使用内部迭代,因此,当并行执行流时,Java编译器和运行时将确定处理流元素的顺序,以最大化并行计算的好处,除非流操作另有规定。
  • 第五个管道使用 forEachOrdered 方法,它按照源指定的顺序处理流的元素,无论你是串行执行流还是并行执行流,请注意,如果使用类似 forEachOrdered 的并行流操作,可能会失去并行性的好处。

副作用

方法或表达式除了返回或生成值外,如果还修改计算机的状态,则会产生副作用。例如可变归纳,以及调用 System.out.println 方法用于调试。JDK可以很好地处理管道中的某些副作用,特别地, collect 方法被设计用于以并行安全的方式执行最常见的流操作,这些操作具有副作用,像 forEachpeek 这样的操作是为副作用而设计的,返回 void 的lambda表达式,例如调用 System.out.println 的表达式,除了副作用什么都做不了。即便如此,你也应该小心使用 forEachpeek 操作,如果你将这些操作中的一个操作与并行流一起使用,那么Java运行时可能会从多个线程并发地调用你指定为其参数的lambda表达式。此外,永远不要作为参数传递lambda表达式,这些表达式在 filtermap 等操作中有副作用,下面几节讨论干扰和有状态lambda表达式,它们都可能是副作用的来源,并且可能返回不一致或不可预测的结果,特别是在并行流中。然而,惰性的概念首先被讨论,因为它对干扰有直接的影响。

惰性

所有中间操作都是惰性的,如果表达式、方法或算法的值只在需要时计算,那么它就是惰性的(如果一个算法被立即计算或处理,那么它就是立即的),中间操作是惰性的,因为它们直到终端操作开始时才开始处理流的内容。延迟处理流使Java编译器和运行时能够优化它们处理流的方式,例如,在管道中,如聚合操作一节中描述的 filter-mapToInt-average 示例, average 操作可以从 mapToInt 操作创建的流中获取几个整数, mapToInt 操作从 filter 操作中获取元素。 average 操作将重复这个过程,直到从流中获得所有需要的元素,然后计算平均值。

干扰

流操作中的Lambda表达式不应该干涉,当管道处理流时修改流的源时发生干扰,例如,下面的代码尝试连接列表 listofstring 中包含的字符串,但是,它抛出一个 ConcurrentModificationException

try {
    List<String> listOfStrings =
        new ArrayList<>(Arrays.asList("one", "two"));
         
    // This will fail as the peek operation will attempt to add the
    // string "three" to the source after the terminal operation has
    // commenced. 
             
    String concatenatedString = listOfStrings
        .stream()
        
        // Don't do this! Interference occurs here.
        .peek(s -> listOfStrings.add("three"))
        
        .reduce((a, b) -> a + " " + b)
        .get();
                 
    System.out.println("Concatenated string: " + concatenatedString);
         
} catch (Exception e) {
    System.out.println("Exception caught: " + e.toString());
}

这个示例使用 reduce 操作将 listofstring 中包含的字符串连接到一个可选的 String 值中, reduce 操作是一个终端操作,但是,这里的管道调用中间操作 peek ,该操作试图向 listofstring 添加一个新元素。记住,所有中间操作都是惰性的,这意味着本例中的管道在调用操作 get 时开始执行,在执行 get 操作完成时结束执行, peek 操作的参数试图在管道执行期间修改流源,这将导致Java运行时抛出 ConcurrentModificationException

有状态的Lambda表达式

避免在流操作中使用有状态lambda表达式作为参数,有状态lambda表达式的结果取决于在管道执行期间可能发生变化的任何状态,下面的示例使用 map 中间操作将列表 listOfIntegers 中的元素添加到一个新的 List 实例中,它这样做了两次,第一次是串行流,然后是并行流:

List<Integer> serialStorage = new ArrayList<>();
     
System.out.println("Serial stream:");
listOfIntegers
    .stream()
    
    // Don't do this! It uses a stateful lambda expression.
    .map(e -> { serialStorage.add(e); return e; })
    
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");
     
serialStorage
    .stream()
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");

System.out.println("Parallel stream:");
List<Integer> parallelStorage = Collections.synchronizedList(
    new ArrayList<>());
listOfIntegers
    .parallelStream()
    
    // Don't do this! It uses a stateful lambda expression.
    .map(e -> { parallelStorage.add(e); return e; })
    
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");
     
parallelStorage
    .stream()
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");

lambda表达式 e -> { parallelStorage.add(e); return e; } 是一个有状态的lambda表达式,每次运行代码时,它的结果都可能不同,这个例子打印了以下内容:

Serial stream:
8 7 6 5 4 3 2 1
8 7 6 5 4 3 2 1
Parallel stream:
8 7 6 5 4 3 2 1
1 3 6 2 4 5 8 7

forEachOrdered 操作按照流指定的顺序处理元素,无论流是串行执行还是并行执行,但是,当并行执行流时, map 操作处理Java运行时和编译器指定的流的元素,因此,lambda表达式 e -> { parallelStorage.add(e); return e; } 每次运行代码时,向列表中添加元素的 parallelStorage 都会发生变化,对于确定性和可预测的结果,确保流操作中的lambda表达式参数不是有状态的。

注意:这个例子调用了 synchronizedList 方法,因此列表 parallelStorage 是线程安全的,记住集合不是线程安全的,这意味着多个线程不应该同时访问特定的集合,假设你在创建 parallelStorage 时没有调用 synchronizedList 方法:

List<Integer> parallelStorage = new ArrayList<>();

这个例子的行为是不规律的,因为多线程访问和修改 parallelStorage 时,没有同步之类的机制来调度特定线程何时可以访问 List 实例,因此,该示例可以打印如下输出:

Parallel stream:
8 7 6 5 4 3 2 1
null 3 5 4 7 8 1 2

上一篇:聚合归纳操作

原文  https://segmentfault.com/a/1190000019870290
正文到此结束
Loading...