- 类型参数:
-
T
- 完成者结果的类型
- 所有已实现的接口:
-
Serializable
,Future<T>
ForkJoinTask
。CountedCompleters在子任务停滞和阻塞的情况下通常比其他形式的ForkJoinTasks更健壮,但编程起来不太直观。CountedCompleter的用法类似于其他基于完成的组件(例如CompletionHandler
),不同之处在于可能需要多个待处理完成才能触发完成操作onCompletion(CountedCompleter)
,而不仅仅是一个。除非另有初始化,待处理计数从零开始,但可以使用方法setPendingCount(int)
、addToPendingCount(int)
和compareAndSetPendingCount(int, int)
(原子方式)进行更改。在调用tryComplete()
时,如果待处理操作计数不为零,则递减;否则,执行完成操作,如果此Completer本身有一个Completer,则继续使用其Completer进行进程。与诸如Phaser
和Semaphore
等相关同步组件一样,这些方法仅影响内部计数;它们不会建立任何进一步的内部簿记。特别是,不会维护待处理任务的身份。如下所示,您可以创建子类,在需要时记录一些或所有待处理任务或其结果。如下所示,还提供了支持自定义完成遍历的实用方法。但是,由于CountedCompleters仅提供基本的同步机制,因此可能有必要创建进一步的抽象子类,以维护适用于一组相关用法的链接、字段和其他支持方法。
具体的CountedCompleter类必须定义方法compute()
,在大多数情况下(如下所示),在返回之前应调用tryComplete()
一次。该类还可以选择重写方法onCompletion(CountedCompleter)
以在正常完成时执行操作,并重写方法onExceptionalCompletion(Throwable, CountedCompleter)
以在任何异常时执行操作。
CountedCompleters通常不返回结果,这种情况下它们通常声明为CountedCompleter<Void>
,并且作为结果值始终返回null
。在其他情况下,您应该重写方法getRawResult()
以从join(), invoke()
和相关方法中提供结果。一般来说,此方法应返回CountedCompleter对象的一个字段(或一个或多个字段的函数)的值,该字段在完成时保存结果。默认情况下,方法setRawResult(T)
在CountedCompleters中不起作用。可以重写此方法以维护保存结果数据的其他对象或字段,但这种情况很少适用。
一个没有自己的Completer(即,getCompleter()
返回null
)的CountedCompleter可以作为具有此附加功能的常规ForkJoinTask使用。但是,任何具有另一个Completer的Completer仅用作其他计算的内部辅助程序,因此其自身任务状态(如ForkJoinTask.isDone()
等方法中报告的)是任意的;此状态仅在显式调用complete(T)
、ForkJoinTask.cancel(boolean)
、ForkJoinTask.completeExceptionally(Throwable)
或在方法compute
的异常完成时才会更改。在任何异常完成时,如果存在任务的Completer(及其Completer等),则可能将异常传递给任务的Completer。类似地,取消内部CountedCompleter仅对该Completer产生局部影响,因此通常不太有用。
示例用法。
并行递归分解。 CountedCompleters可以组成树,类似于通常与RecursiveAction
一起使用的树,尽管设置它们的构造通常有所不同。在这里,每个任务的Completer是其在计算树中的父级。即使它们需要更多的簿记,但在将可能耗时的操作(无法进一步细分)应用于数组或集合的每个元素时,CountedCompleters可能是更好的选择;特别是当操作对某些元素的完成时间与其他元素明显不同时,要么是因为固有变化(例如I/O)要么是因为辅助效果,如垃圾回收。由于CountedCompleters提供自己的延续,其他任务无需阻塞等待执行它们。
例如,这是一个使用二分递归分解将工作分成单个片段(叶子任务)的实用方法的初始版本。即使工作被分成单独的调用,树状技术通常优于直接分叉叶子任务,因为它们减少了线程间通信并改善了负载平衡。在递归情况下,每对子任务中的第二个完成的任务会触发其父任务的完成(因为不执行结果组合,所以不会重写方法onCompletion
的默认空操作实现)。该实用方法设置根任务并调用它(这里,隐式使用ForkJoinPool.commonPool()
)。始终将待处理计数设置为子任务的数量并在返回之前立即调用tryComplete()
是直截了当且可靠的(但不是最佳的)。
public static <E> void forEach(E[] array, Consumer<E> action) {
class Task extends CountedCompleter<Void> {
final int lo, hi;
Task(Task parent, int lo, int hi) {
super(parent); this.lo = lo; this.hi = hi;
}
public void compute() {
if (hi - lo >= 2) {
int mid = (lo + hi) >>> 1;
// 必须在分叉之前设置待处理计数
setPendingCount(2);
new Task(this, mid, hi).fork(); // 右子任务
new Task(this, lo, mid).fork(); // 左子任务
}
else if (hi > lo)
action.accept(array[lo]);
tryComplete();
}
}
new Task(null, 0, array.length).invoke();
}
通过注意到在递归情况下,任务在分叉其右任务后没有其他操作可执行,因此可以在返回之前直接调用其左任务来改进此设计。 (这类似于尾递归消除。)此外,当任务中的最后一个操作是分叉或调用子任务(“尾调用”)时,可以优化掉对tryComplete()
的调用,代价是使待处理计数看起来“少一个”。
public void compute() {
if (hi - lo >= 2) {
int mid = (lo + hi) >>> 1;
setPendingCount(1); // 看起来少一个,但是正确的!
new Task(this, mid, hi).fork(); // 右子任务
new Task(this, lo, mid).compute(); // 直接调用
} else {
if (hi > lo)
action.accept(array[lo]);
tryComplete();
}
}
进一步优化时,注意到左任务甚至不需要存在。我们可以继续使用原始任务,为每个分叉添加一个待处理计数。此外,因为此树中的任何任务都不实现onCompletion(CountedCompleter)
方法,tryComplete
可以替换为propagateCompletion()
。
public void compute() {
int n = hi - lo;
for (; n >= 2; n /= 2) {
addToPendingCount(1);
new Task(this, lo + n/2, lo + n).fork();
}
if (n > 0)
action.accept(array[lo]);
propagateCompletion();
}
当待处理计数可以预先计算时,它们可以在构造函数中建立:
public static <E> void forEach(E[] array, Consumer<E> action) {
class Task extends CountedCompleter<Void> {
final int lo, hi;
Task(Task parent, int lo, int hi) {
super(parent, 31 - Integer.numberOfLeadingZeros(hi - lo));
this.lo = lo; this.hi = hi;
}
public void compute() {
for (int n = hi - lo; n >= 2; n /= 2)
new Task(this, lo + n/2, lo + n).fork();
action.accept(array[lo]);
propagateCompletion();
}
}
if (array.length > 0)
new Task(null, 0, array.length).invoke();
}
对这类的进一步优化可能包括为叶步骤专门化类,每次迭代细分四个而不是两个,并使用自适应阈值而不总是细分到单个元素。
搜索。 CountedCompleters的树可以在数据结构的不同部分搜索值或属性,并在找到一个值时立即在AtomicReference
中报告结果。其他任务可以轮询结果以避免不必要的工作。(您还可以取消其他任务,但通常更简单和更有效的方法是让它们注意到结果已设置,如果是,则跳过进一步处理。)再次以使用完全分区的数组为例说明(在实践中,叶子任务几乎总是会处理多个元素):
class Searcher<E> extends CountedCompleter<E> {
final E[] array; final AtomicReference<E> result; final int lo, hi;
Searcher(CountedCompleter<?> p, E[] array, AtomicReference<E> result, int lo, int hi) {
super(p);
this.array = array; this.result = result; this.lo = lo; this.hi = hi;
}
public E getRawResult() { return result.get(); }
public void compute() { // 类似于ForEach版本3
int l = lo, h = hi;
while (result.get() == null && h >= l) {
if (h - l >= 2) {
int mid = (l + h) >>> 1;
addToPendingCount(1);
new Searcher(this, array, result, mid, h).fork();
h = mid;
}
else {
E x = array[l];
if (matches(x) && result.compareAndSet(null, x))
quietlyCompleteRoot(); // 根任务现在可以加入
break;
}
}
tryComplete(); // 通常完成,无论是否找到
}
boolean matches(E e) { ... } // 如果找到返回true
public static <E> E search(E[] array) {
return new Searcher<E>(null, array, new AtomicReference<E>(), 0, array.length).invoke();
}
}
在这个例子中,以及其他任务除了compareAndSet
一个共同结果之外没有其他影响的情况下,tryComplete
的尾部无条件调用可以被改为有条件的(if (result.get() == null) tryComplete();
),因为一旦根任务完成,就不需要进一步的记录来管理完成。
记录子任务。 CountedCompleter任务通常需要在方法onCompletion(CountedCompleter)
中访问多个子任务的结果。如下面的类所示(执行简化形式的映射-归约,其中映射和归约都是类型为E
),在分治设计中实现这一点的一种方法是让每个子任务记录其兄弟,以便在方法onCompletion
中访问。这种技术适用于归约,其中组合左右结果的顺序无关紧要;有序归约需要明确的左/右指定。其他示例中看到的其他简化的变体也可能适用。
class MyMapper<E> { E apply(E v) { ... } }
class MyReducer<E> { E apply(E x, E y) { ... } }
class MapReducer<E> extends CountedCompleter<E> {
final E[] array; final MyMapper<E> mapper;
final MyReducer<E> reducer; final int lo, hi;
MapReducer<E> sibling;
E result;
MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
MyReducer<E> reducer, int lo, int hi) {
super(p);
this.array = array; this.mapper = mapper;
this.reducer = reducer; this.lo = lo; this.hi = hi;
}
public void compute() {
if (hi - lo >= 2) {
int mid = (lo + hi) >>> 1;
MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid);
MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi);
left.sibling = right;
right.sibling = left;
setPendingCount(1); // 只有right是待处理的
right.fork();
left.compute(); // 直接执行left
}
else {
if (hi > lo)
result = mapper.apply(array[lo]);
tryComplete();
}
}
public void onCompletion(CountedCompleter<?> caller) {
if (caller != this) {
MapReducer<E> child = (MapReducer<E>)caller;
MapReducer<E> sib = child.sibling;
if (sib == null || sib.result == null)
result = child.result;
else
result = reducer.apply(child.result, sib.result);
}
}
public E getRawResult() { return result; }
public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) {
return new MapReducer<E>(null, array, mapper, reducer,
0, array.length).invoke();
}
}
这里,方法onCompletion
采用了许多组合结果的完成设计中常见的形式。这种回调式方法在每个任务完成时触发一次,在两种不同的上下文中,即待处理计数为零时,或者变为零时:(1)由任务本身触发,如果在调用tryComplete
时其待处理计数为零,或者(2)由任何子任务触发,当它们完成并将待处理计数减少为零时。 caller
参数区分情况。通常情况下,当调用者是this
时,不需要采取任何操作。否则,可以使用调用者参数(通常通过转换)提供一个值(和/或链接到其他值)以进行组合。假设正确使用待处理计数,在onCompletion
内部的操作(一次)在任务及其子任务完成时发生。在此方法内部不需要额外的同步来确保对该任务或其他已完成任务的字段的访问的线程安全性。
完成遍历。 如果使用onCompletion处理完成不适用或不方便,可以使用方法firstComplete()
和nextComplete()
创建自定义遍历。例如,要定义一个只拆分右侧任务的MapReducer,形式如第三个ForEach示例,完成必须协作沿着未耗尽的子任务链接进行归约,可以这样做:
class MapReducer<E> extends CountedCompleter<E> { // 版本2
final E[] array; final MyMapper<E> mapper;
final MyReducer<E> reducer; final int lo, hi;
MapReducer<E> forks, next; // 记录子任务分叉在列表中
E result;
MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
MyReducer<E> reducer, int lo, int hi, MapReducer<E> next) {
super(p);
this.array = array; this.mapper = mapper;
this.reducer = reducer; this.lo = lo; this.hi = hi;
this.next = next;
}
public void compute() {
int l = lo, h = hi;
while (h - l >= 2) {
int mid = (l + h) >>> 1;
addToPendingCount(1);
(forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork();
h = mid;
}
if (h > l)
result = mapper.apply(array[l]);
// 通过沿着和推进子任务链接进行归约来处理完成
for (CountedCompleter<?> c = firstComplete(); c != null; c = c.nextComplete()) {
for (MapReducer t = (MapReducer)c, s = t.forks; s != null; s = t.forks = s.next)
t.result = reducer.apply(t.result, s.result);
}
}
public E getRawResult() { return result; }
public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) {
return new MapReducer<E>(null, array, mapper, reducer,
0, array.length, null).invoke();
}
}
触发器。 有些CountedCompleters本身从不被分叉,而是作为其他设计中的管道的一部分;包括其中一个异步任务的完成触发另一个异步任务的情况。例如:
class HeaderBuilder extends CountedCompleter<...> { ... }
class BodyBuilder extends CountedCompleter<...> { ... }
class PacketSender extends CountedCompleter<...> {
PacketSender(...) { super(null, 1); ... } // 在第二次完成时触发
public void compute() { } // 从不调用
public void onCompletion(CountedCompleter<?> caller) { sendPacket(); }
}
// 示例用法:
PacketSender p = new PacketSender();
new HeaderBuilder(p, ...).fork();
new BodyBuilder(p, ...).fork();
- 自 JDK 版本:
- 1.8
- 参见:
-
Nested Class Summary
Nested classes/interfaces declared in interface java.util.concurrent.Future
Future.State
-
Constructor Summary
ModifierConstructorDescriptionprotected
创建一个新的 CountedCompleter,没有完成者并且初始挂起计数为零。protected
CountedCompleter
(CountedCompleter<?> completer) 创建一个新的 CountedCompleter,具有给定的完成者和初始挂起计数为零。protected
CountedCompleter
(CountedCompleter<?> completer, int initialPendingCount) 创建一个新的 CountedCompleter,具有给定的完成者和初始挂起计数。 -
Method Summary
Modifier and TypeMethodDescriptionfinal void
addToPendingCount
(int delta) (原子性地)将给定值添加到挂起计数中。final boolean
compareAndSetPendingCount
(int expected, int count) (原子性地)仅当当前保持给定期望值时,将挂起计数设置为给定计数。void
不管挂起计数如何,调用onCompletion(CountedCompleter)
,标记此任务为完成,并在存在时进一步触发此任务的完成者上的tryComplete()
。abstract void
compute()
此任务执行的主要计算。final int
如果挂起计数不为零,则(原子性地)将其递减。protected final boolean
exec()
为 CountedCompleters 实现执行约定。final CountedCompleter
<?> 如果此任务的挂起计数为零,则返回此任务;否则递减其挂起计数并返回null
。final CountedCompleter
<?> 返回此任务构造函数中建立的完成者,如果没有则返回null
。final int
返回当前挂起计数。返回计算的结果。final CountedCompleter
<?> getRoot()
返回当前计算的根;即,如果此任务没有完成者,则返回此任务,否则返回其完成者的根。final void
helpComplete
(int maxTasks) 如果此任务尚未完成,则尝试处理最多给定数量的其他未处理任务,这些任务在此任务的完成路径上,如果已知存在这些任务。final CountedCompleter
<?> 如果此任务没有完成者,则调用ForkJoinTask.quietlyComplete()
并返回null
。void
onCompletion
(CountedCompleter<?> caller) 当调用方法tryComplete()
并且挂起计数为零时,或者调用无条件方法complete(T)
时执行操作。boolean
onExceptionalCompletion
(Throwable ex, CountedCompleter<?> caller) 当调用方法ForkJoinTask.completeExceptionally(Throwable)
或方法compute()
抛出异常,并且此任务尚未以正常方式完成时执行操作。final void
等同于tryComplete()
,但不沿着完成路径调用onCompletion(CountedCompleter)
:如果挂起计数不为零,则递减计数;否则,类似地尝试完成此任务的完成者(如果存在),否则标记此任务为完成。此方法在不需要为计算中的每个完成者调用onCompletion
的情况下可能很有用。final void
等同于getRoot().quietlyComplete()
。final void
setPendingCount
(int count) 将挂起计数设置为给定值。protected void
setRawResult
(T t) 结果承载的 CountedCompleters 可以选择使用的方法,以帮助维护结果数据。final void
如果挂起计数不为零,则递减计数;否则调用onCompletion(CountedCompleter)
,然后类似地尝试完成此任务的完成者(如果存在),否则标记此任务为完成。Methods declared in class java.util.concurrent.ForkJoinTask
adapt, adapt, adapt, adaptInterruptible, cancel, compareAndSetForkJoinTaskTag, completeExceptionally, fork, get, get, getException, getForkJoinTaskTag, getPool, getQueuedTaskCount, getSurplusQueuedTaskCount, helpQuiesce, inForkJoinPool, invoke, invokeAll, invokeAll, invokeAll, isCancelled, isCompletedAbnormally, isCompletedNormally, isDone, join, peekNextLocalTask, pollNextLocalTask, pollSubmission, pollTask, quietlyComplete, quietlyInvoke, quietlyJoin, quietlyJoin, quietlyJoinUninterruptibly, reinitialize, setForkJoinTaskTag, tryUnfork
Methods declared in class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods declared in interface java.util.concurrent.Future
exceptionNow, resultNow, state
-
Constructor Details
-
CountedCompleter
创建一个新的 CountedCompleter,具有给定的完成者和初始挂起计数。- 参数:
-
completer
- 此任务的完成者,如果没有则为null
-
initialPendingCount
- 初始挂起计数
-
CountedCompleter
创建一个新的 CountedCompleter,具有给定的完成者和初始挂起计数为零。- 参数:
-
completer
- 此任务的完成者,如果没有则为null
-
CountedCompleter
protected CountedCompleter()创建一个新的 CountedCompleter,没有完成者并且初始挂起计数为零。
-
-
Method Details
-
compute
public abstract void compute()此任务执行的主要计算。 -
onCompletion
当调用方法tryComplete()
并且挂起计数为零时,或者调用无条件方法complete(T)
时执行操作。默认情况下,此方法不执行任何操作。您可以通过检查给定的调用者参数的标识来区分情况。如果不等于this
,则通常是一个可能包含结果(和/或链接到其他结果)以进行合并的子任务。- 参数:
-
caller
- 调用此方法的任务(可能是此任务本身)
-
onExceptionalCompletion
当调用方法ForkJoinTask.completeExceptionally(Throwable)
或方法compute()
抛出异常,并且此任务尚未以正常方式完成时执行操作。在进入此方法时,此任务ForkJoinTask.isCompletedAbnormally()
。此方法的返回值控制进一步传播:如果为true
并且此任务有一个尚未完成的完成者,则该完成者也将以相同的异常异常完成。此方法的默认实现除了返回true
外不执行任何操作。- 参数:
-
ex
- 异常 -
caller
- 调用此方法的任务(可能是此任务本身) - 返回:
-
如果此异常应传播到此任务的完成者,则为
true
-
getCompleter
返回此任务构造函数中建立的完成者,如果没有则返回null
。- 返回:
- 完成者
-
getPendingCount
public final int getPendingCount()返回当前挂起计数。- 返回:
- 当前挂起计数
-
setPendingCount
public final void setPendingCount(int count) 将挂起计数设置为给定值。- 参数:
-
count
- 计数
-
addToPendingCount
public final void addToPendingCount(int delta) (原子性地)将给定值添加到挂起计数中。- 参数:
-
delta
- 要添加的值
-
compareAndSetPendingCount
public final boolean compareAndSetPendingCount(int expected, int count) (原子性地)仅当当前保持给定期望值时,将挂起计数设置为给定计数。- 参数:
-
expected
- 期望的值 -
count
- 新值 - 返回:
-
如果成功则为
true
-
decrementPendingCountUnlessZero
public final int decrementPendingCountUnlessZero()如果挂起计数不为零,则(原子性地)递减它。- 返回:
- 进入此方法时的初始(未递减)挂起计数
-
getRoot
返回当前计算的根;即,如果此任务没有完成者,则返回此任务,否则返回其完成者的根。- 返回:
- 当前计算的根
-
tryComplete
public final void tryComplete()如果挂起计数不为零,则递减计数;否则调用onCompletion(CountedCompleter)
,然后类似地尝试完成此任务的完成者(如果存在),否则标记此任务为完成。 -
propagateCompletion
public final void propagateCompletion()等同于tryComplete()
,但不沿着完成路径调用onCompletion(CountedCompleter)
:如果挂起计数不为零,则递减计数;否则,类似地尝试完成此任务的完成者(如果存在),否则标记此任务为完成。在某些情况下,此方法可能很有用,其中onCompletion
不应或不需要为计算中的每个完成者调用。 -
complete
不管挂起计数如何,调用onCompletion(CountedCompleter)
,标记此任务为完成,并在存在时进一步触发此任务的完成者上的tryComplete()
。给定的 rawResult 用作setRawResult(T)
的参数,然后调用onCompletion(CountedCompleter)
或标记此任务为完成;对于重写setRawResult
的类,其值仅有意义。此方法不修改挂起计数。当需要尽快完成任何一个(而不是所有)子任务结果时,此方法可能很有用。但是,在常见(和推荐)情况下,如果未覆盖
setRawResult
,则可以更简单地使用quietlyCompleteRoot()
来实现此效果。- 覆盖:
-
complete
在类ForkJoinTask<T>
中 - 参数:
-
rawResult
- 原始结果
-
firstComplete
如果此任务的挂起计数为零,则返回此任务;否则递减其挂起计数并返回null
。此方法设计用于与完成遍历循环中的nextComplete()
一起使用。- 返回:
-
如果挂起计数为零,则为此任务,否则为
null
-
nextComplete
如果此任务没有完成者,则调用ForkJoinTask.quietlyComplete()
并返回null
。或者,如果完成者的待处理计数不为零,则将该待处理计数减一并返回null
。否则,返回完成者。此方法可用作同类任务层次结构的完成遍历循环的一部分:for (CountedCompleter<?> c = firstComplete(); c != null; c = c.nextComplete()) { // ... 处理 c ... }
- 返回:
-
完成者,如果没有则返回
null
-
quietlyCompleteRoot
public final void quietlyCompleteRoot()等同于getRoot().quietlyComplete()
。 -
helpComplete
public final void helpComplete(int maxTasks) 如果此任务尚未完成,则尝试处理最多给定数量的其他未处理任务,这些任务在此任务的完成路径上,如果已知存在这样的任务。- 参数:
-
maxTasks
- 要处理的最大任务数。如果小于或等于零,则不处理任何任务。
-
exec
protected final boolean exec()为CountedCompleters实现执行约定。- 指定者:
-
exec
在类ForkJoinTask<T>
- 返回:
-
如果已知此任务已正常完成,则返回
true
-
getRawResult
返回计算的结果。默认情况下,返回null
,适用于Void
操作,但在其他情况下应该被覆盖,几乎总是返回一个在完成时保存结果的字段或函数的字段。- 指定者:
-
getRawResult
在类ForkJoinTask<T>
- 返回:
- 计算的结果
-
setRawResult
可选用于帮助维护结果数据的具有结果的CountedCompleters可能使用的方法。默认情况下,不执行任何操作。不建议覆盖。但是,如果覆盖此方法以更新现有对象或字段,则通常必须定义为线程安全的。- 指定者:
-
setRawResult
在类ForkJoinTask<T>
- 参数:
-
t
- 值
-