CyclicBarrier
支持一个可选的Runnable
命令,该命令在每个屏障点运行一次,在团队中的最后一个线程到达之后,但在任何线程被释放之前。这个barrier action对于在任何团队继续之前更新共享状态很有用。
示例用法:以下是在并行分解设计中使用屏障的示例:
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
Runnable barrierAction = () -> mergeRows(...);
barrier = new CyclicBarrier(N, barrierAction);
List<Thread> threads = new ArrayList<>(N);
for (int i = 0; i < N; i++) {
Thread thread = new Thread(new Worker(i));
threads.add(thread);
thread.start();
}
// 等待直到完成
for (Thread thread : threads)
try {
thread.join();
} catch (InterruptedException ex) { }
}
}
在这里,每个工作线程处理矩阵的一行,然后在屏障处等待,直到所有行都被处理。当所有行都被处理时,提供的Runnable
屏障动作被执行并合并行。如果合并器确定已找到解决方案,则done()
将返回true
,并且每个工作线程将终止。
如果屏障动作在执行时不依赖于被暂停的团队,则在释放时,团队中的任何线程都可以执行该动作。为了实现这一点,每次调用await()
都会返回该线程在屏障处的到达索引。然后,您可以选择哪个线程应该执行屏障动作,例如:
if (barrier.await() == 0) {
// 记录此迭代的完成
}
CyclicBarrier
对于失败的同步尝试使用全有或全无的破坏模型:如果一个线程因中断、失败或超时而提前离开屏障点,那么等待在该屏障点的所有其他线程也将通过BrokenBarrierException
(或InterruptedException
,如果它们也在大约同一时间被中断)异常异常地离开。
内存一致性效果:在调用await()
之前的线程中的操作先行发生于屏障动作的一部分,而这又先行发生于其他线程中对应的await()
成功返回后的操作。
- 自 JDK 版本:
- 1.5
- 参见:
-
Constructor Summary
ConstructorDescriptionCyclicBarrier
(int parties) 创建一个新的CyclicBarrier
,当给定数量的团队(线程)在其上等待时将触发,并且在触发屏障时不执行预定义操作。CyclicBarrier
(int parties, Runnable barrierAction) 创建一个新的CyclicBarrier
,当给定数量的团队(线程)在其上等待时将触发,并且在触发屏障时执行给定的屏障动作,由进入屏障的最后一个线程执行。 -
Method Summary
-
Constructor Details
-
CyclicBarrier
创建一个新的CyclicBarrier
,当给定数量的团队(线程)在其上等待时将触发,并且在触发屏障时执行给定的屏障动作,由进入屏障的最后一个线程执行。- 参数:
-
parties
- 必须在触发屏障之前调用await()
的线程数 -
barrierAction
- 在触发屏障时要执行的命令,如果没有操作则为null
- 抛出:
-
IllegalArgumentException
- 如果parties
小于 1
-
CyclicBarrier
public CyclicBarrier(int parties) 创建一个新的CyclicBarrier
,当给定数量的团队(线程)在其上等待时将触发,并且在触发屏障时不执行预定义操作。- 参数:
-
parties
- 必须在触发屏障之前调用await()
的线程数 - 抛出:
-
IllegalArgumentException
- 如果parties
小于 1
-
-
Method Details
-
getParties
public int getParties()返回触发此屏障所需的团队数量。- 返回:
- 触发此屏障所需的团队数量
-
await
等待,直到所有团队在此屏障上调用await
。如果当前线程不是最后一个到达的线程,则出于线程调度目的,它将被禁用并处于休眠状态,直到发生以下情况之一:
如果当前线程:
- 在进入此方法时设置了中断状态;或
- 在等待时被中断
InterruptedException
,并清除当前线程的中断状态。如果在任何线程等待时重置屏障,或者在调用
await
时屏障被破坏,或者在任何线程等待时屏障被破坏,则会抛出BrokenBarrierException
。如果任何线程在等待时被中断,则所有其他等待线程将抛出
BrokenBarrierException
,并且屏障将处于破坏状态。如果当前线程是最后一个到达的线程,并且在构造函数中提供了一个非空的屏障动作,则当前线程将在允许其他线程继续之前运行该动作。如果在屏障动作期间发生异常,则该异常将在当前线程中传播,并且屏障将处于破坏状态。
- 返回:
-
当前线程的到达索引,其中索引
getParties() - 1
表示第一个到达,零表示最后一个到达 - 抛出:
-
InterruptedException
- 如果当前线程在等待时被中断 -
BrokenBarrierException
- 如果另一个线程在当前线程等待时被中断或超时,或者在调用await
时屏障被重置,或者在屏障被破坏时,或者由于异常导致屏障动作(如果存在)失败
-
await
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException 等待直到所有参与方在此屏障上调用await
,或指定的等待时间过去。如果当前线程不是最后一个到达的线程,则出于线程调度目的,它将被禁用并处于休眠状态,直到发生以下情况之一:
如果当前线程:
- 在进入此方法时设置了中断状态;或
- 在等待时被中断
InterruptedException
,并清除当前线程的中断状态。如果指定的等待时间过去,则会抛出
TimeoutException
。如果时间小于或等于零,则该方法将根本不等待。如果任何线程在等待时屏障被
reset()
,或者在调用await
时屏障已损坏,或者在任何线程等待时屏障被BrokenBarrierException
抛出。如果任何线程在等待时被中断,则所有其他等待线程将抛出
BrokenBarrierException
,并且屏障将处于损坏状态。如果当前线程是最后一个到达的线程,并且在构造函数中提供了一个非空的屏障动作,则当前线程在允许其他线程继续之前运行该动作。如果在屏障动作期间发生异常,则该异常将在当前线程中传播,并且屏障将处于损坏状态。
- 参数:
-
timeout
- 等待屏障的时间 -
unit
- 超时参数的时间单位 - 返回:
-
当前线程的到达索引,其中索引
getParties() - 1
表示第一个到达,零表示最后一个到达 - 抛出:
-
InterruptedException
- 如果当前线程在等待时被中断 -
TimeoutException
- 如果指定的超时时间过去。在这种情况下,屏障将被破坏。 -
BrokenBarrierException
- 如果另一个线程在当前线程等待时被中断或超时,或者在调用await
时屏障被重置,或者在屏障由于异常而失败时
-
isBroken
public boolean isBroken()查询此屏障是否处于破坏状态。- 返回:
-
如果自从构造或上次重置以来,有一个或多个参与方因中断或超时而突破此屏障,或者由于异常而导致屏障动作失败,则返回
true
;否则返回false
。
-
reset
public void reset()将屏障重置为初始状态。如果当前有任何参与方正在屏障处等待,则它们将返回一个BrokenBarrierException
。请注意,对于其他原因导致破坏后的重置可能会很复杂;线程需要以其他方式重新同步,并选择一个线程执行重置。最好是创建一个新的屏障以供后续使用。 -
getNumberWaiting
public int getNumberWaiting()返回当前正在屏障处等待的参与方数量。此方法主要用于调试和断言。- 返回:
-
当前在
await()
中阻塞的参与方数量
-