文档

Java™教程
隐藏目录
并行性
指南: 集合
教程: 聚合操作

并行性

并行计算涉及将问题分解为子问题,同时解决这些问题(并行地,每个子问题在单独的线程中运行),然后将子问题的解的结果合并起来。Java SE提供了fork/join框架,可以更轻松地在应用程序中实现并行计算。但是,使用此框架,您必须指定如何对问题进行细分(分区)。通过聚合操作,Java运行时为您执行此细分和解的结果的合并。

在使用集合的应用程序中实现并行化的一个困难是集合不是线程安全的,这意味着多个线程不能在没有引入线程干扰内存一致性错误的情况下操作集合。集合框架提供同步包装器,可以将自动同步添加到任意集合中,使其线程安全。但是,同步会引入线程争用。您要避免线程争用,因为它会阻止线程并行运行。通过聚合操作和并行流,您可以使用非线程安全的集合实现并行性,前提是在对其进行操作时不修改集合。

请注意,并行性不一定比串行执行操作更快,尽管在有足够的数据和处理器核心的情况下可以更快。虽然聚合操作可以更轻松地实现并行性,但是确定应用程序是否适合并行性仍然是您的责任。

本节涵盖以下主题:

您可以在示例ParallelismExamples中找到本节中描述的代码片段。

在并行中执行流

您可以在串行或并行中执行流。当流在并行中执行时,Java运行时将流分割为多个子流。聚合操作并行迭代和处理这些子流,然后将结果合并。

创建流时,默认是串行流,除非另有规定。要创建并行流,请调用操作Collection.parallelStream。或者,调用操作BaseStream.parallel。例如,以下语句并行计算所有男性成员的平均年龄:

double average = roster
    .parallelStream()
    .filter(p -> p.getGender() == Person.Sex.MALE)
    .mapToInt(Person::getAge)
    .average()
    .getAsDouble();

并发归约

考虑以下示例(在归约部分中描述)以性别分组成员。此示例调用collect操作,将集合roster归约为Map

Map<Person.Sex, List<Person>> byGender =
    roster
        .stream()
        .collect(
            Collectors.groupingBy(Person::getGender));

以下是并行等效:

ConcurrentMap<Person.Sex, List<Person>> byGender =
    roster
        .parallelStream()
        .collect(
            Collectors.groupingByConcurrent(Person::getGender));

这被称为并发归约。如果对于包含collect操作的特定流管道,满足以下所有条件,Java运行时会执行并发归约:

注意:此示例返回的是ConcurrentMap的实例,而不是Map,并调用了groupingByConcurrent操作而不是groupingBy操作。 (有关ConcurrentMap的更多信息,请参见Concurrent Collections部分。)与操作groupingByConcurrent不同,操作groupingBy在并行流中的性能较差。 (这是因为它通过键合并两个映射,这在计算上是昂贵的。)类似地,操作Collectors.toConcurrentMap在并行流中的性能比操作Collectors.toMap要好。

顺序

流处理元素的顺序取决于流是以串行还是并行方式执行、流的来源以及中间操作。例如,考虑以下示例,它多次使用forEach操作打印ArrayList实例的元素:

Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8 };
List<Integer> listOfIntegers =
    new ArrayList<>(Arrays.asList(intArray));

System.out.println("listOfIntegers:");
listOfIntegers
    .stream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");

System.out.println("listOfIntegers 按照逆序排序:");
Comparator<Integer> normal = Integer::compare;
Comparator<Integer> reversed = normal.reversed(); 
Collections.sort(listOfIntegers, reversed);  
listOfIntegers
    .stream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");
     
System.out.println("并行流");
listOfIntegers
    .parallelStream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");
    
System.out.println("另一个并行流:");
listOfIntegers
    .parallelStream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");
     
System.out.println("使用forEachOrdered:");
listOfIntegers
    .parallelStream()
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");

该示例包含五个流水线。它打印类似以下的输出:

listOfIntegers:
1 2 3 4 5 6 7 8
以相反顺序排序的listOfIntegers:
8 7 6 5 4 3 2 1
并行流:
3 4 1 6 2 5 7 8
另一个并行流:
6 3 1 5 7 8 4 2
使用forEachOrdered:
8 7 6 5 4 3 2 1

这个示例执行以下操作:

副作用

如果一个方法或表达式除了返回或产生一个值之外,还修改计算机的状态,那么它具有副作用。例如,可变规约(使用collect操作的操作;请参阅规约部分了解更多信息)以及调用System.out.println方法进行调试。JDK能够很好地处理某些副作用。特别是,collect方法被设计为以并行安全的方式执行具有副作用的最常见的流操作。像forEachpeek这样的操作专为副作用而设计;返回void的lambda表达式,比如调用System.out.println的表达式,除了具有副作用之外无法做任何事情。即便如此,你应该谨慎使用forEachpeek操作;如果你在并行流中使用其中之一,Java运行时可能会从多个线程并发地调用你指定的lambda表达式作为其参数。此外,永远不要在filtermap等操作中传递具有副作用的lambda表达式作为参数。接下来的几节将讨论干扰有状态的lambda表达式,它们都可能是副作用的来源,并且可能返回不一致或不可预测的结果,特别是在并行流中。不过,首先讨论惰性求值的概念,因为它直接影响到干扰。

惰性求值

所有的中间操作都是惰性求值的。如果一个表达式、方法或者算法的值只有在需要的时候才会被计算,那么它就是惰性的。(如果一个算法在立即被计算或处理,那么它就是急切的。)中间操作是惰性的,因为它们在终端操作开始之前不会开始处理流的内容。惰性处理流使得Java编译器和运行时能够优化它们处理流的方式。例如,在像 filter-mapToInt-average 这样的管道中,average 操作可以从 mapToInt 操作创建的流中获取前几个整数,而 mapToInt 操作则从 filter 操作获取元素。 average 操作会重复这个过程,直到它从流中获取到所有所需的元素,然后计算平均值。

干扰

流操作中的Lambda表达式不应该干扰。当流的源在管道处理流时被修改时,就会发生干扰。例如,下面的代码尝试将包含在 List listOfStrings 中的字符串连接起来。然而,它抛出了一个 ConcurrentModificationException 异常:

try {
    List<String> listOfStrings =
        new ArrayList<>(Arrays.asList("one", "two"));
         
    // 这将会失败,因为 peek 操作在终端操作开始后尝试将字符串 "three" 添加到源中。
             
    String concatenatedString = listOfStrings
        .stream()
        
        // 不要这样做!这里发生了干扰。
        .peek(s -> listOfStrings.add("three"))
        
        .reduce((a, b) -> a + " " + b)
        .get();
                 
    System.out.println("连接后的字符串: " + concatenatedString);
         
} catch (Exception e) {
    System.out.println("捕获到异常: " + e.toString());
}

这个例子将 listOfStrings 中的字符串使用 reduce 操作连接成一个 Optional<String> 值,这是一个终端操作。然而,这个管道调用了中间操作 peek,它尝试向 listOfStrings 添加一个新元素。记住,所有的中间操作都是惰性的。这意味着在这个例子中,管道的执行在调用 get 操作时开始,并在 get 操作完成时结束。 peek 操作的参数试图在管道执行期间修改流源,这导致Java运行时抛出一个 ConcurrentModificationException 异常。

有状态的Lambda表达式

在流操作中避免使用有状态的Lambda表达式作为参数。有状态的Lambda表达式是指其结果依赖于在流执行过程中可能发生变化的任何状态。以下示例使用map中间操作将List listOfIntegers中的元素添加到新的List实例中。它先使用串行流,然后再使用并行流:

List<Integer> serialStorage = new ArrayList<>();
     
System.out.println("串行流:");
listOfIntegers
    .stream()
    
    // 不要这样做!它使用了有状态的Lambda表达式。
    .map(e -> { serialStorage.add(e); return e; })
    
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");
     
serialStorage
    .stream()
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");

System.out.println("并行流:");
List<Integer> parallelStorage = Collections.synchronizedList(
    new ArrayList<>());
listOfIntegers
    .parallelStream()
    
    // 不要这样做!它使用了有状态的Lambda表达式。
    .map(e -> { parallelStorage.add(e); return e; })
    
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");
     
parallelStorage
    .stream()
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");

lambda表达式 e -> { parallelStorage.add(e); return e; } 是一个有状态的Lambda表达式。它的结果每次运行代码时都可能不同。此示例输出以下结果:

串行流:
8 7 6 5 4 3 2 1
8 7 6 5 4 3 2 1
并行流:
8 7 6 5 4 3 2 1
1 3 6 2 4 5 8 7

forEachOrdered操作按照流指定的顺序处理元素,无论流是串行还是并行执行。然而,当流以并行方式执行时,map操作会根据Java运行时和编译器指定的流中的元素进行处理。因此,lambda表达式 e -> { parallelStorage.add(e); return e; }List parallelStorage添加元素的顺序每次运行代码时都可能不同。为了获得确定性和可预测的结果,请确保流操作中的lambda表达式参数不是有状态的。

注意:此示例调用了方法synchronizedList,以使List parallelStorage是线程安全的。请记住,集合不是线程安全的。这意味着多个线程不应同时访问同一个集合。假设在创建parallelStorage时没有调用方法synchronizedList

List<Integer> parallelStorage = new ArrayList<>();

这个示例表现不稳定,因为多个线程在没有像同步这样的机制来安排特定线程访问 List 实例时,访问和修改了 parallelStorage。因此,该示例可能会打印类似以下输出:

并行流:
8 7 6 5 4 3 2 1
null 3 5 4 7 8 1 2

上一页: Redution
下一页: 问题和练习:聚合操作