这些Java教程是为JDK 8编写的。本页中描述的示例和实践不利用后续版本中引入的改进,可能使用不再可用的技术。
请参阅Java语言更改,了解Java SE 9及后续版本中更新的语言功能的摘要。
请参阅JDK发行说明,了解所有JDK版本的新功能、增强功能以及已删除或弃用选项的信息。
并行计算涉及将问题分解为子问题,同时解决这些问题(并行地,每个子问题在单独的线程中运行),然后将子问题的解的结果合并起来。Java SE提供了fork/join框架,可以更轻松地在应用程序中实现并行计算。但是,使用此框架,您必须指定如何对问题进行细分(分区)。通过聚合操作,Java运行时为您执行此细分和解的结果的合并。
在使用集合的应用程序中实现并行化的一个困难是集合不是线程安全的,这意味着多个线程不能在没有引入线程干扰或内存一致性错误的情况下操作集合。集合框架提供同步包装器,可以将自动同步添加到任意集合中,使其线程安全。但是,同步会引入线程争用。您要避免线程争用,因为它会阻止线程并行运行。通过聚合操作和并行流,您可以使用非线程安全的集合实现并行性,前提是在对其进行操作时不修改集合。
请注意,并行性不一定比串行执行操作更快,尽管在有足够的数据和处理器核心的情况下可以更快。虽然聚合操作可以更轻松地实现并行性,但是确定应用程序是否适合并行性仍然是您的责任。
本节涵盖以下主题:
您可以在示例ParallelismExamples
中找到本节中描述的代码片段。
您可以在串行或并行中执行流。当流在并行中执行时,Java运行时将流分割为多个子流。聚合操作并行迭代和处理这些子流,然后将结果合并。
创建流时,默认是串行流,除非另有规定。要创建并行流,请调用操作Collection.parallelStream
。或者,调用操作BaseStream.parallel
。例如,以下语句并行计算所有男性成员的平均年龄:
double average = roster .parallelStream() .filter(p -> p.getGender() == Person.Sex.MALE) .mapToInt(Person::getAge) .average() .getAsDouble();
考虑以下示例(在归约部分中描述)以性别分组成员。此示例调用collect
操作,将集合roster
归约为Map
:
Map<Person.Sex, List<Person>> byGender = roster .stream() .collect( Collectors.groupingBy(Person::getGender));
以下是并行等效:
ConcurrentMap<Person.Sex, List<Person>> byGender = roster .parallelStream() .collect( Collectors.groupingByConcurrent(Person::getGender));
这被称为并发归约。如果对于包含collect
操作的特定流管道,满足以下所有条件,Java运行时会执行并发归约:
collect
操作的参数,也就是收集器,具有特性Collector.Characteristics.CONCURRENT
。要确定收集器的特性,调用Collector.characteristics
方法。Collector.Characteristics.UNORDERED
。要确保流是无序的,请调用BaseStream.unordered
操作。注意:此示例返回的是ConcurrentMap
的实例,而不是Map
,并调用了groupingByConcurrent
操作而不是groupingBy
操作。 (有关ConcurrentMap
的更多信息,请参见Concurrent Collections部分。)与操作groupingByConcurrent
不同,操作groupingBy
在并行流中的性能较差。 (这是因为它通过键合并两个映射,这在计算上是昂贵的。)类似地,操作Collectors.toConcurrentMap
在并行流中的性能比操作Collectors.toMap
要好。
流处理元素的顺序取决于流是以串行还是并行方式执行、流的来源以及中间操作。例如,考虑以下示例,它多次使用forEach
操作打印ArrayList
实例的元素:
Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8 }; List<Integer> listOfIntegers = new ArrayList<>(Arrays.asList(intArray)); System.out.println("listOfIntegers:"); listOfIntegers .stream() .forEach(e -> System.out.print(e + " ")); System.out.println(""); System.out.println("listOfIntegers 按照逆序排序:"); Comparator<Integer> normal = Integer::compare; Comparator<Integer> reversed = normal.reversed(); Collections.sort(listOfIntegers, reversed); listOfIntegers .stream() .forEach(e -> System.out.print(e + " ")); System.out.println(""); System.out.println("并行流"); listOfIntegers .parallelStream() .forEach(e -> System.out.print(e + " ")); System.out.println(""); System.out.println("另一个并行流:"); listOfIntegers .parallelStream() .forEach(e -> System.out.print(e + " ")); System.out.println(""); System.out.println("使用forEachOrdered:"); listOfIntegers .parallelStream() .forEachOrdered(e -> System.out.print(e + " ")); System.out.println("");
该示例包含五个流水线。它打印类似以下的输出:
listOfIntegers: 1 2 3 4 5 6 7 8 以相反顺序排序的listOfIntegers: 8 7 6 5 4 3 2 1 并行流: 3 4 1 6 2 5 7 8 另一个并行流: 6 3 1 5 7 8 4 2 使用forEachOrdered: 8 7 6 5 4 3 2 1
这个示例执行以下操作:
listOfIntegers
中的元素,按照它们添加到列表的顺序。Collections.sort
对listOfIntegers
排序后打印元素。forEachOrdered
方法,它按照源指定的顺序处理流的元素,无论你是以串行还是并行方式执行流。请注意,如果你在并行流中使用forEachOrdered
等操作,可能会失去并行性的好处。如果一个方法或表达式除了返回或产生一个值之外,还修改计算机的状态,那么它具有副作用。例如,可变规约(使用collect
操作的操作;请参阅规约部分了解更多信息)以及调用System.out.println
方法进行调试。JDK能够很好地处理某些副作用。特别是,collect
方法被设计为以并行安全的方式执行具有副作用的最常见的流操作。像forEach
和peek
这样的操作专为副作用而设计;返回void的lambda表达式,比如调用System.out.println
的表达式,除了具有副作用之外无法做任何事情。即便如此,你应该谨慎使用forEach
和peek
操作;如果你在并行流中使用其中之一,Java运行时可能会从多个线程并发地调用你指定的lambda表达式作为其参数。此外,永远不要在filter
和map
等操作中传递具有副作用的lambda表达式作为参数。接下来的几节将讨论干扰和有状态的lambda表达式,它们都可能是副作用的来源,并且可能返回不一致或不可预测的结果,特别是在并行流中。不过,首先讨论惰性求值的概念,因为它直接影响到干扰。
所有的中间操作都是惰性求值的。如果一个表达式、方法或者算法的值只有在需要的时候才会被计算,那么它就是惰性的。(如果一个算法在立即被计算或处理,那么它就是急切的。)中间操作是惰性的,因为它们在终端操作开始之前不会开始处理流的内容。惰性处理流使得Java编译器和运行时能够优化它们处理流的方式。例如,在像 filter
-mapToInt
-average
这样的管道中,average
操作可以从 mapToInt
操作创建的流中获取前几个整数,而 mapToInt
操作则从 filter
操作获取元素。 average
操作会重复这个过程,直到它从流中获取到所有所需的元素,然后计算平均值。
流操作中的Lambda表达式不应该干扰。当流的源在管道处理流时被修改时,就会发生干扰。例如,下面的代码尝试将包含在 List
listOfStrings
中的字符串连接起来。然而,它抛出了一个 ConcurrentModificationException
异常:
try { List<String> listOfStrings = new ArrayList<>(Arrays.asList("one", "two")); // 这将会失败,因为 peek 操作在终端操作开始后尝试将字符串 "three" 添加到源中。 String concatenatedString = listOfStrings .stream() // 不要这样做!这里发生了干扰。 .peek(s -> listOfStrings.add("three")) .reduce((a, b) -> a + " " + b) .get(); System.out.println("连接后的字符串: " + concatenatedString); } catch (Exception e) { System.out.println("捕获到异常: " + e.toString()); }
这个例子将 listOfStrings
中的字符串使用 reduce
操作连接成一个 Optional<String>
值,这是一个终端操作。然而,这个管道调用了中间操作 peek
,它尝试向 listOfStrings
添加一个新元素。记住,所有的中间操作都是惰性的。这意味着在这个例子中,管道的执行在调用 get
操作时开始,并在 get
操作完成时结束。 peek
操作的参数试图在管道执行期间修改流源,这导致Java运行时抛出一个 ConcurrentModificationException
异常。
在流操作中避免使用有状态的Lambda表达式作为参数。有状态的Lambda表达式是指其结果依赖于在流执行过程中可能发生变化的任何状态。以下示例使用map
中间操作将List
listOfIntegers
中的元素添加到新的List
实例中。它先使用串行流,然后再使用并行流:
List<Integer> serialStorage = new ArrayList<>(); System.out.println("串行流:"); listOfIntegers .stream() // 不要这样做!它使用了有状态的Lambda表达式。 .map(e -> { serialStorage.add(e); return e; }) .forEachOrdered(e -> System.out.print(e + " ")); System.out.println(""); serialStorage .stream() .forEachOrdered(e -> System.out.print(e + " ")); System.out.println(""); System.out.println("并行流:"); List<Integer> parallelStorage = Collections.synchronizedList( new ArrayList<>()); listOfIntegers .parallelStream() // 不要这样做!它使用了有状态的Lambda表达式。 .map(e -> { parallelStorage.add(e); return e; }) .forEachOrdered(e -> System.out.print(e + " ")); System.out.println(""); parallelStorage .stream() .forEachOrdered(e -> System.out.print(e + " ")); System.out.println("");
lambda表达式 e -> { parallelStorage.add(e); return e; }
是一个有状态的Lambda表达式。它的结果每次运行代码时都可能不同。此示例输出以下结果:
串行流: 8 7 6 5 4 3 2 1 8 7 6 5 4 3 2 1 并行流: 8 7 6 5 4 3 2 1 1 3 6 2 4 5 8 7
forEachOrdered
操作按照流指定的顺序处理元素,无论流是串行还是并行执行。然而,当流以并行方式执行时,map
操作会根据Java运行时和编译器指定的流中的元素进行处理。因此,lambda表达式 e -> { parallelStorage.add(e); return e; }
向List
parallelStorage
添加元素的顺序每次运行代码时都可能不同。为了获得确定性和可预测的结果,请确保流操作中的lambda表达式参数不是有状态的。
注意:此示例调用了方法synchronizedList
,以使List
parallelStorage
是线程安全的。请记住,集合不是线程安全的。这意味着多个线程不应同时访问同一个集合。假设在创建parallelStorage
时没有调用方法synchronizedList
:
List<Integer> parallelStorage = new ArrayList<>();
这个示例表现不稳定,因为多个线程在没有像同步这样的机制来安排特定线程访问 List
实例时,访问和修改了 parallelStorage
。因此,该示例可能会打印类似以下输出:
并行流: 8 7 6 5 4 3 2 1 null 3 5 4 7 8 1 2