CyclicBarrier
和CountDownLatch
,但支持更灵活的用法。
注册。 与其他屏障不同,可以随时注册要在屏障上同步的方。任务可以随时注册(使用方法register()
、bulkRegister(int)
或建立初始方数的构造方法),并在任何到达时选择性地注销(使用arriveAndDeregister()
)。与大多数基本同步构造一样,注册和注销仅影响内部计数;它们不会建立任何进一步的内部簿记,因此任务无法查询它们是否已注册。(但是,您可以通过对此类进行子类化来引入此类簿记。)
同步。 像CyclicBarrier
一样,Phaser
可以重复等待。方法arriveAndAwaitAdvance()
的效果类似于CyclicBarrier.await
。每个阶段的phaser都有一个关联的阶段编号。阶段编号从零开始,在所有方到达时前进,达到Integer.MAX_VALUE
后绕回零。使用阶段编号可以通过两种方法独立控制到达phaser时的操作和等待其他操作,这两种方法可以由任何注册方调用:
- 到达。 方法
arrive()
和arriveAndDeregister()
记录到达。这些方法不会阻塞,但会返回一个关联的到达阶段编号;即,到达应用的phaser的阶段编号。当给定阶段的最后一个方到达时,将执行一个可选操作并前进到下一阶段。这些操作由触发阶段前进的方执行,并通过覆盖方法onAdvance(int, int)
进行安排,该方法还控制终止。覆盖此方法类似于但比为CyclicBarrier
提供屏障操作更灵活。 - 等待。 方法
awaitAdvance(int)
需要指示到达阶段编号的参数,并在phaser前进到(或已经在)不同阶段时返回。与使用CyclicBarrier
的类似构造不同,方法awaitAdvance
即使等待线程被中断也会继续等待。还提供了可中断和超时版本,但任务在可中断或超时等待时遇到的异常不会改变phaser的状态。如果需要,您可以在这些异常的处理程序中执行任何相关恢复,通常在调用forceTermination
之后。Phasers也可以被在ForkJoinPool
中执行的任务使用。如果池的并行级别可以容纳最大数量的同时阻塞方,进度是得到保证的。
终止。 一个phaser可能进入终止状态,可以使用方法isTerminated()
检查。在终止时,所有同步方法立即返回而不等待前进,如负返回值所示。同样,尝试在终止时注册不会产生任何效果。当调用onAdvance
返回true
时,将触发终止。默认实现返回true
,如果注销导致注册方数量变为零。如下所示,当phaser控制具有固定迭代次数的操作时,通常方便覆盖此方法以在当前阶段编号达到阈值时导致终止。还提供方法forceTermination()
以突然释放等待线程并允许它们终止。
分层。 Phasers可以是分层的(即,在树结构中构建)。具有大量方的phaser,否则可能会经历重同步争用成本,可以设置为子phaser组共享一个公共父phaser。即使产生更大的每操作开销,这可能会大大增加吞吐量。
在分层phaser树中,子phaser与其父phaser的注册和注销是自动管理的。每当子phaser的注册方数量变为非零(如在Phaser(Phaser,int)
构造方法、register()
或bulkRegister(int)
中建立时),子phaser将与其父phaser注册。每当注册方数量由于调用arriveAndDeregister()
而变为零时,子phaser将从其父phaser注销。
监控。 虽然同步方法只能由注册方调用,但任何调用者都可以监视phaser的当前状态。在任何给定时刻,总共有getRegisteredParties()
方,其中有getArrivedParties()
方已到达当前阶段(getPhase()
)。当剩余的(getUnarrivedParties()
)方到达时,阶段前进。这些方法返回的值可能反映瞬时状态,因此通常不适用于同步控制。方法toString()
以便于非正式监视的形式返回这些状态查询的快照。
内存一致性效果:在任何形式的到达方法之前的操作 发生在相应的阶段前进和onAdvance操作之前,后者又 发生在阶段前进之后的操作。
示例用法:
可以使用Phaser
代替CountDownLatch
来控制为可变数量的方提供服务的一次性操作。典型的用法是首先注册设置此操作的方法,然后启动所有操作,然后注销,如下所示:
void runTasks(List<Runnable> tasks) {
Phaser startingGate = new Phaser(1); // "1"用于注册自身
// 创建并启动线程
for (Runnable task : tasks) {
startingGate.register();
new Thread(() -> {
startingGate.arriveAndAwaitAdvance();
task.run();
}).start();
}
// 注销自身以允许线程继续
startingGate.arriveAndDeregister();
}
导致一组线程重复执行给定迭代次数的操作的一种方法是覆盖onAdvance
:
void startTasks(List<Runnable> tasks, int iterations) {
Phaser phaser = new Phaser() {
protected boolean onAdvance(int phase, int registeredParties) {
return phase >= iterations - 1 || registeredParties == 0;
}
};
phaser.register();
for (Runnable task : tasks) {
phaser.register();
new Thread(() -> {
do {
task.run();
phaser.arriveAndAwaitAdvance();
} while (!phaser.isTerminated());
}).start();
}
// 允许线程继续;不等待它们
phaser.arriveAndDeregister();
}
如果主任务必须稍后等待终止,可以重新注册然后执行类似的循环:
// ...
phaser.register();
while (!phaser.isTerminated())
phaser.arriveAndAwaitAdvance();
相关构造可以用于在您确定阶段永远不会绕过Integer.MAX_VALUE
的情况下等待特定阶段编号。例如:
void awaitPhase(Phaser phaser, int phase) {
int p = phaser.register(); // 假设调用者尚未注册
while (p < phase) {
if (phaser.isTerminated())
// ... 处理意外终止
else
p = phaser.arriveAndAwaitAdvance();
}
phaser.arriveAndDeregister();
}
要使用phaser树创建一组n
个任务,可以使用以下形式的代码,假设有一个接受Phaser
的构造函数并在构造时注册的Task类。在调用build(new Task[n], 0, n, new Phaser())
之后,这些任务可以启动,例如通过提交到池:
void build(Task[] tasks, int lo, int hi, Phaser ph) {
if (hi - lo > TASKS_PER_PHASER) {
for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
int j = Math.min(i + TASKS_PER_PHASER, hi);
build(tasks, i, j, new Phaser(ph));
}
} else {
for (int i = lo; i < hi; ++i)
tasks[i] = new Task(ph);
// 假设new Task(ph)执行ph.register()
}
}
TASKS_PER_PHASER
的最佳值主要取决于预期的同步速率。对于极小的每阶段任务体(因此高速率),可能适合的值为四,或者对于极大的任务体,可能为数百。
实现注意事项:此实现将最大方数限制为65535。尝试注册额外方将导致IllegalStateException
。但是,您可以并且应该创建分层phaser以容纳任意大的参与者集。
- 自从:
- 1.7
-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionint
arrive()
到达此phaser,而不等待其他方到达。int
到达此phaser并等待其他方。int
到达此phaser并从中注销,而不等待其他方到达。int
awaitAdvance
(int phase) 等待此phaser的阶段从给定阶段值前进,如果当前阶段不等于给定阶段值或此phaser已终止,则立即返回。int
awaitAdvanceInterruptibly
(int phase) 等待此phaser的阶段从给定阶段值前进,如果在等待时被中断则抛出InterruptedException
,或者如果当前阶段不等于给定阶段值或此phaser已终止,则立即返回。int
awaitAdvanceInterruptibly
(int phase, long timeout, TimeUnit unit) 等待此屏障的阶段从给定阶段值或给定超时时间前进,如果在等待时被中断则抛出InterruptedException
,如果当前阶段不等于给定阶段值或此屏障已终止,则立即返回。int
bulkRegister
(int parties) 向此屏障添加给定数量的新未到达方。void
强制此屏障进入终止状态。int
返回已到达此屏障当前阶段的注册方数量。返回此屏障的父级,如果没有则返回null
。final int
getPhase()
返回当前阶段编号。int
返回在此屏障注册的方数量。getRoot()
返回此屏障的根祖先,如果没有父级则与此屏障相同。int
返回尚未到达此屏障当前阶段的注册方数量。boolean
如果此屏障已终止,则返回true
。protected boolean
onAdvance
(int phase, int registeredParties) 可重写的方法,在即将到来的阶段前执行操作,并控制终止。int
register()
向此屏障添加一个新的未到达方。toString()
返回标识此屏障及其状态的字符串。
-
Constructor Details
-
Phaser
public Phaser()创建一个新的屏障,没有初始注册方,没有父级,初始阶段编号为0。使用此屏障的任何线程都需要首先为其注册。 -
Phaser
public Phaser(int parties) 创建一个具有给定数量已注册未到达方的新屏障,没有父级,初始阶段编号为0。- 参数:
-
parties
- 前进到下一阶段所需的方数量 - 抛出:
-
IllegalArgumentException
- 如果方少于零或大于支持的最大方数量
-
Phaser
- 参数:
-
parent
- 父屏障
-
Phaser
创建一个具有给定父级和已注册未到达方数量的新屏障。当给定父级非空且给定方数量大于零时,此子屏障将与其父屏障注册。- 参数:
-
parent
- 父屏障 -
parties
- 前进到下一阶段所需的方数量 - 抛出:
-
IllegalArgumentException
- 如果方少于零或大于支持的最大方数量
-
-
Method Details
-
register
public int register()向此屏障添加一个新的未到达方。如果正在进行onAdvance(int, int)
的调用,则此方法可能在返回之前等待其完成。如果此屏障有父级,并且此屏障先前没有注册方,则此子屏障也将与其父屏障注册。如果此屏障已终止,则注册尝试无效,并返回负值。- 返回:
- 此注册应用的到达阶段编号。如果此值为负,则此屏障已终止,在这种情况下注册无效。
- 抛出:
-
IllegalStateException
- 如果尝试注册超过支持的最大方数量
-
bulkRegister
public int bulkRegister(int parties) 向此屏障添加给定数量的新未到达方。如果正在进行onAdvance(int, int)
的调用,则此方法可能在返回之前等待其完成。如果此屏障有父级,并且给定方数量大于零,并且此屏障先前没有注册方,则此子屏障也将与其父屏障注册。如果此屏障已终止,则注册尝试无效,并返回负值。- 参数:
-
parties
- 前进到下一阶段所需的额外方数量 - 返回:
- 此注册应用的到达阶段编号。如果此值为负,则此屏障已终止,在这种情况下注册无效。
- 抛出:
-
IllegalStateException
- 如果尝试注册超过支持的最大方数量 -
IllegalArgumentException
- 如果parties < 0
-
arrive
public int arrive()到达此屏障,无需等待其他方到达。对于未注册的方调用此方法是使用错误。但是,如果有的话,此错误可能仅在此屏障的某些后续操作中导致
IllegalStateException
。- 返回:
- 到达阶段编号,如果已终止则为负值
- 抛出:
-
IllegalStateException
- 如果未终止且未到达方数量将变为负数
-
arriveAndDeregister
public int arriveAndDeregister()到达此屏障并取消注册,无需等待其他方到达。取消注册会减少未来阶段中前进所需的方数量。如果此屏障有父级,并且取消注册导致此屏障具有零方,则此屏障也将从其父屏障中取消注册。对于未注册的方调用此方法是使用错误。但是,如果有的话,此错误可能仅在此屏障的某些后续操作中导致
IllegalStateException
。- 返回:
- 到达阶段编号,如果已终止则为负值
- 抛出:
-
IllegalStateException
- 如果未终止且已注册或未到达方数量将变为负数
-
arriveAndAwaitAdvance
public int arriveAndAwaitAdvance()到达此屏障并等待其他方。在效果上等同于awaitAdvance(arrive())
。如果需要等待中断或超时,可以使用另一种形式的awaitAdvance
方法来安排。如果需要到达后取消注册,则使用awaitAdvance(arriveAndDeregister())
。对于未注册的方调用此方法是使用错误。但是,如果有的话,此错误可能仅在此屏障的某些后续操作中导致
IllegalStateException
。- 返回:
- 到达阶段编号,如果已终止则为(负) 当前阶段
- 抛出:
-
IllegalStateException
- 如果未终止且未到达方数量将变为负数
-
awaitAdvance
public int awaitAdvance(int phase) 等待此屏障的阶段从给定阶段值前进,如果当前阶段不等于给定阶段值或此屏障已终止,则立即返回。- 参数:
-
phase
- 到达阶段编号,如果已终止则为负值;此参数通常是前一次调用arrive
或arriveAndDeregister
返回的值。 - 返回:
- 下一个到达阶段编号,如果参数为负值则为参数本身,如果已终止则为(负) 当前阶段
-
awaitAdvanceInterruptibly
等待此屏障的阶段从给定阶段值前进,如果当前阶段不等于给定阶段值或此屏障已终止,则抛出InterruptedException
。- 参数:
-
phase
- 到达阶段编号,如果已终止则为负值;此参数通常是前一次调用arrive
或arriveAndDeregister
返回的值。 - 返回:
- 下一个到达阶段编号,如果参数为负值则为参数本身,如果已终止则为(负) 当前阶段
- 抛出:
-
InterruptedException
- 如果在等待时线程被中断
-
awaitAdvanceInterruptibly
public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException 等待此屏障的阶段从给定阶段值或给定超时时间前进,如果当前阶段不等于给定阶段值或此屏障已终止,则抛出InterruptedException
。- 参数:
-
phase
- 到达阶段编号,如果已终止则为负值;此参数通常是前一次调用arrive
或arriveAndDeregister
返回的值。 -
timeout
- 等待超时时间,以unit
为单位 -
unit
- 确定如何解释timeout
参数的TimeUnit
- 返回:
- 下一个到达阶段编号,如果参数为负值则为参数本身,如果已终止则为(负) 当前阶段
- 抛出:
-
InterruptedException
- 如果在等待时线程被中断 -
TimeoutException
- 如果等待超时
-
forceTermination
public void forceTermination()强制此屏障进入终止状态。注册方数量不受影响。如果此屏障是分层屏障集的成员,则集合中的所有屏障都将终止。如果此屏障已终止,则此方法无效。此方法可能在一个或多个任务遇到意外异常后协调恢复时有用。 -
getPhase
public final int getPhase()返回当前阶段编号。最大阶段编号为Integer.MAX_VALUE
,之后会从零重新开始。在终止时,阶段编号为负值,此时可以通过getPhase() + Integer.MIN_VALUE
获取终止前的主导阶段。- 返回:
- 阶段编号,如果已终止则为负值
-
getRegisteredParties
public int getRegisteredParties()返回在此屏障注册的方数量。- 返回:
- 当事方的数量
-
getArrivedParties
public int getArrivedParties()返回已经到达当前阶段的注册方的数量。如果这个屏障已经终止,返回的值是毫无意义和任意的。- 返回:
- 已到达方的数量
-
getUnarrivedParties
public int getUnarrivedParties()返回尚未到达当前阶段的注册方的数量。如果这个屏障已经终止,返回的值是毫无意义和任意的。- 返回:
- 未到达方的数量
-
getParent
返回这个屏障的父屏障,如果没有则返回null
。- 返回:
-
这个屏障的父屏障,如果没有则返回
null
-
getRoot
返回这个屏障的根祖先,如果没有父屏障则与这个屏障相同。- 返回:
- 这个屏障的根祖先
-
isTerminated
public boolean isTerminated()如果这个屏障已经终止,则返回true
。- 返回:
-
如果这个屏障已经终止,则返回
true
-
onAdvance
protected boolean onAdvance(int phase, int registeredParties) 可重写的方法,在即将发生阶段推进时执行操作,并控制终止。当推进这个屏障的方到达时(当所有其他等待方处于休眠状态时),会调用这个方法。如果这个方法返回true
,则在推进时这个屏障将被设置为最终终止状态,并且后续调用isTerminated()
将返回true。如果这个方法的调用引发(未检查的)异常或错误,则会传播到试图推进这个屏障的方,此时不会发生推进。这个方法的参数提供了当前转换中屏障的状态。在
onAdvance
中调用到的到达、注册和等待方法对这个屏障的影响是未指定的,不应依赖于它们。如果这个屏障是层级屏障集的成员,则
onAdvance
仅对其根屏障在每次推进时调用。为了支持最常见的用例,这个方法的默认实现在注册方的数量因某个方调用
arriveAndDeregister
而变为零时返回true
。您可以通过重写这个方法始终返回false
来禁用此行为,从而在未来的注册时继续:Phaser phaser = new Phaser() { protected boolean onAdvance(int phase, int parties) { return false; } };
- 参数:
-
phase
- 进入此方法时的当前阶段编号,在此屏障推进之前 -
registeredParties
- 当前注册方的数量 - 返回:
-
如果这个屏障应该终止,则返回
true
-
toString
返回一个标识此屏障及其状态的字符串。状态在括号中,包括字符串"phase = "
后跟阶段编号,"parties = "
后跟注册方的数量,以及"arrived = "
后跟已到达方的数量。
-