Module java.base

Class Phaser

java.lang.Object
java.util.concurrent.Phaser

public class Phaser extends Object
一个可重复使用的同步屏障,类似于CyclicBarrierCountDownLatch,但支持更灵活的用法。

注册。 与其他屏障不同,可以随时注册要在屏障上同步的方。任务可以随时注册(使用方法register()bulkRegister(int)或建立初始方数的构造方法),并在任何到达时选择性地注销(使用arriveAndDeregister())。与大多数基本同步构造一样,注册和注销仅影响内部计数;它们不会建立任何进一步的内部簿记,因此任务无法查询它们是否已注册。(但是,您可以通过对此类进行子类化来引入此类簿记。)

同步。CyclicBarrier一样,Phaser可以重复等待。方法arriveAndAwaitAdvance()的效果类似于CyclicBarrier.await。每个阶段的phaser都有一个关联的阶段编号。阶段编号从零开始,在所有方到达时前进,达到Integer.MAX_VALUE后绕回零。使用阶段编号可以通过两种方法独立控制到达phaser时的操作和等待其他操作,这两种方法可以由任何注册方调用:

  • 到达。 方法arrive()arriveAndDeregister()记录到达。这些方法不会阻塞,但会返回一个关联的到达阶段编号;即,到达应用的phaser的阶段编号。当给定阶段的最后一个方到达时,将执行一个可选操作并前进到下一阶段。这些操作由触发阶段前进的方执行,并通过覆盖方法onAdvance(int, int)进行安排,该方法还控制终止。覆盖此方法类似于但比为CyclicBarrier提供屏障操作更灵活。
  • 等待。 方法awaitAdvance(int)需要指示到达阶段编号的参数,并在phaser前进到(或已经在)不同阶段时返回。与使用CyclicBarrier的类似构造不同,方法awaitAdvance即使等待线程被中断也会继续等待。还提供了可中断和超时版本,但任务在可中断或超时等待时遇到的异常不会改变phaser的状态。如果需要,您可以在这些异常的处理程序中执行任何相关恢复,通常在调用forceTermination之后。Phasers也可以被在ForkJoinPool中执行的任务使用。如果池的并行级别可以容纳最大数量的同时阻塞方,进度是得到保证的。

终止。 一个phaser可能进入终止状态,可以使用方法isTerminated()检查。在终止时,所有同步方法立即返回而不等待前进,如负返回值所示。同样,尝试在终止时注册不会产生任何效果。当调用onAdvance返回true时,将触发终止。默认实现返回true,如果注销导致注册方数量变为零。如下所示,当phaser控制具有固定迭代次数的操作时,通常方便覆盖此方法以在当前阶段编号达到阈值时导致终止。还提供方法forceTermination()以突然释放等待线程并允许它们终止。

分层。 Phasers可以是分层的(即,在树结构中构建)。具有大量方的phaser,否则可能会经历重同步争用成本,可以设置为子phaser组共享一个公共父phaser。即使产生更大的每操作开销,这可能会大大增加吞吐量。

在分层phaser树中,子phaser与其父phaser的注册和注销是自动管理的。每当子phaser的注册方数量变为非零(如在Phaser(Phaser,int)构造方法、register()bulkRegister(int)中建立时),子phaser将与其父phaser注册。每当注册方数量由于调用arriveAndDeregister()而变为零时,子phaser将从其父phaser注销。

监控。 虽然同步方法只能由注册方调用,但任何调用者都可以监视phaser的当前状态。在任何给定时刻,总共有getRegisteredParties()方,其中有getArrivedParties()方已到达当前阶段(getPhase())。当剩余的(getUnarrivedParties())方到达时,阶段前进。这些方法返回的值可能反映瞬时状态,因此通常不适用于同步控制。方法toString()以便于非正式监视的形式返回这些状态查询的快照。

内存一致性效果:在任何形式的到达方法之前的操作 发生在相应的阶段前进和onAdvance操作之前,后者又 发生在阶段前进之后的操作。

示例用法:

可以使用Phaser代替CountDownLatch来控制为可变数量的方提供服务的一次性操作。典型的用法是首先注册设置此操作的方法,然后启动所有操作,然后注销,如下所示:

 
 void runTasks(List<Runnable> tasks) {
   Phaser startingGate = new Phaser(1); // "1"用于注册自身
   // 创建并启动线程
   for (Runnable task : tasks) {
     startingGate.register();
     new Thread(() -> {
       startingGate.arriveAndAwaitAdvance();
       task.run();
     }).start();
   }

   // 注销自身以允许线程继续
   startingGate.arriveAndDeregister();
 }

导致一组线程重复执行给定迭代次数的操作的一种方法是覆盖onAdvance

 
 void startTasks(List<Runnable> tasks, int iterations) {
   Phaser phaser = new Phaser() {
     protected boolean onAdvance(int phase, int registeredParties) {
       return phase >= iterations - 1 || registeredParties == 0;
     }
   };
   phaser.register();
   for (Runnable task : tasks) {
     phaser.register();
     new Thread(() -> {
       do {
         task.run();
         phaser.arriveAndAwaitAdvance();
       } while (!phaser.isTerminated());
     }).start();
   }
   // 允许线程继续;不等待它们
   phaser.arriveAndDeregister();
 }
如果主任务必须稍后等待终止,可以重新注册然后执行类似的循环:
 
   // ...
   phaser.register();
   while (!phaser.isTerminated())
     phaser.arriveAndAwaitAdvance();

相关构造可以用于在您确定阶段永远不会绕过Integer.MAX_VALUE的情况下等待特定阶段编号。例如:

 
 void awaitPhase(Phaser phaser, int phase) {
   int p = phaser.register(); // 假设调用者尚未注册
   while (p < phase) {
     if (phaser.isTerminated())
       // ... 处理意外终止
     else
       p = phaser.arriveAndAwaitAdvance();
   }
   phaser.arriveAndDeregister();
 }

要使用phaser树创建一组n个任务,可以使用以下形式的代码,假设有一个接受Phaser的构造函数并在构造时注册的Task类。在调用build(new Task[n], 0, n, new Phaser())之后,这些任务可以启动,例如通过提交到池:

 
 void build(Task[] tasks, int lo, int hi, Phaser ph) {
   if (hi - lo > TASKS_PER_PHASER) {
     for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
       int j = Math.min(i + TASKS_PER_PHASER, hi);
       build(tasks, i, j, new Phaser(ph));
     }
   } else {
     for (int i = lo; i < hi; ++i)
       tasks[i] = new Task(ph);
       // 假设new Task(ph)执行ph.register()
   }
 }
TASKS_PER_PHASER的最佳值主要取决于预期的同步速率。对于极小的每阶段任务体(因此高速率),可能适合的值为四,或者对于极大的任务体,可能为数百。

实现注意事项:此实现将最大方数限制为65535。尝试注册额外方将导致IllegalStateException。但是,您可以并且应该创建分层phaser以容纳任意大的参与者集。

自从:
1.7
  • Constructor Summary

    Constructors
    Constructor
    Description
    Phaser()
    创建一个新的phaser,没有初始注册方,没有父级,初始阶段编号为0。
    Phaser(int parties)
    创建一个新的phaser,具有给定数量的已注册但未到达的方,没有父级,初始阶段编号为0。
    Phaser(Phaser parent)
    等同于Phaser(parent, 0)
    Phaser(Phaser parent, int parties)
    创建一个新的phaser,具有给定父级和已注册但未到达的方数量。
  • Method Summary

    Modifier and Type
    Method
    Description
    int
    arrive()
    到达此phaser,而不等待其他方到达。
    int
    到达此phaser并等待其他方。
    int
    到达此phaser并从中注销,而不等待其他方到达。
    int
    awaitAdvance(int phase)
    等待此phaser的阶段从给定阶段值前进,如果当前阶段不等于给定阶段值或此phaser已终止,则立即返回。
    int
    等待此phaser的阶段从给定阶段值前进,如果在等待时被中断则抛出InterruptedException,或者如果当前阶段不等于给定阶段值或此phaser已终止,则立即返回。
    int
    awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
    等待此屏障的阶段从给定阶段值或给定超时时间前进,如果在等待时被中断则抛出 InterruptedException,如果当前阶段不等于给定阶段值或此屏障已终止,则立即返回。
    int
    bulkRegister(int parties)
    向此屏障添加给定数量的新未到达方。
    void
    强制此屏障进入终止状态。
    int
    返回已到达此屏障当前阶段的注册方数量。
    返回此屏障的父级,如果没有则返回null
    final int
    返回当前阶段编号。
    int
    返回在此屏障注册的方数量。
    返回此屏障的根祖先,如果没有父级则与此屏障相同。
    int
    返回尚未到达此屏障当前阶段的注册方数量。
    boolean
    如果此屏障已终止,则返回true
    protected boolean
    onAdvance(int phase, int registeredParties)
    可重写的方法,在即将到来的阶段前执行操作,并控制终止。
    int
    向此屏障添加一个新的未到达方。
    返回标识此屏障及其状态的字符串。

    Methods declared in class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
  • Constructor Details

    • Phaser

      public Phaser()
      创建一个新的屏障,没有初始注册方,没有父级,初始阶段编号为0。使用此屏障的任何线程都需要首先为其注册。
    • Phaser

      public Phaser(int parties)
      创建一个具有给定数量已注册未到达方的新屏障,没有父级,初始阶段编号为0。
      参数:
      parties - 前进到下一阶段所需的方数量
      抛出:
      IllegalArgumentException - 如果方少于零或大于支持的最大方数量
    • Phaser

      public Phaser(Phaser parent)
      等效于Phaser(parent, 0)
      参数:
      parent - 父屏障
    • Phaser

      public Phaser(Phaser parent, int parties)
      创建一个具有给定父级和已注册未到达方数量的新屏障。当给定父级非空且给定方数量大于零时,此子屏障将与其父屏障注册。
      参数:
      parent - 父屏障
      parties - 前进到下一阶段所需的方数量
      抛出:
      IllegalArgumentException - 如果方少于零或大于支持的最大方数量
  • Method Details

    • register

      public int register()
      向此屏障添加一个新的未到达方。如果正在进行onAdvance(int, int)的调用,则此方法可能在返回之前等待其完成。如果此屏障有父级,并且此屏障先前没有注册方,则此子屏障也将与其父屏障注册。如果此屏障已终止,则注册尝试无效,并返回负值。
      返回:
      此注册应用的到达阶段编号。如果此值为负,则此屏障已终止,在这种情况下注册无效。
      抛出:
      IllegalStateException - 如果尝试注册超过支持的最大方数量
    • bulkRegister

      public int bulkRegister(int parties)
      向此屏障添加给定数量的新未到达方。如果正在进行onAdvance(int, int)的调用,则此方法可能在返回之前等待其完成。如果此屏障有父级,并且给定方数量大于零,并且此屏障先前没有注册方,则此子屏障也将与其父屏障注册。如果此屏障已终止,则注册尝试无效,并返回负值。
      参数:
      parties - 前进到下一阶段所需的额外方数量
      返回:
      此注册应用的到达阶段编号。如果此值为负,则此屏障已终止,在这种情况下注册无效。
      抛出:
      IllegalStateException - 如果尝试注册超过支持的最大方数量
      IllegalArgumentException - 如果parties < 0
    • arrive

      public int arrive()
      到达此屏障,无需等待其他方到达。

      对于未注册的方调用此方法是使用错误。但是,如果有的话,此错误可能仅在此屏障的某些后续操作中导致 IllegalStateException

      返回:
      到达阶段编号,如果已终止则为负值
      抛出:
      IllegalStateException - 如果未终止且未到达方数量将变为负数
    • arriveAndDeregister

      public int arriveAndDeregister()
      到达此屏障并取消注册,无需等待其他方到达。取消注册会减少未来阶段中前进所需的方数量。如果此屏障有父级,并且取消注册导致此屏障具有零方,则此屏障也将从其父屏障中取消注册。

      对于未注册的方调用此方法是使用错误。但是,如果有的话,此错误可能仅在此屏障的某些后续操作中导致 IllegalStateException

      返回:
      到达阶段编号,如果已终止则为负值
      抛出:
      IllegalStateException - 如果未终止且已注册或未到达方数量将变为负数
    • arriveAndAwaitAdvance

      public int arriveAndAwaitAdvance()
      到达此屏障并等待其他方。在效果上等同于awaitAdvance(arrive())。如果需要等待中断或超时,可以使用另一种形式的 awaitAdvance方法来安排。如果需要到达后取消注册,则使用awaitAdvance(arriveAndDeregister())

      对于未注册的方调用此方法是使用错误。但是,如果有的话,此错误可能仅在此屏障的某些后续操作中导致 IllegalStateException

      返回:
      到达阶段编号,如果已终止则为(负) 当前阶段
      抛出:
      IllegalStateException - 如果未终止且未到达方数量将变为负数
    • awaitAdvance

      public int awaitAdvance(int phase)
      等待此屏障的阶段从给定阶段值前进,如果当前阶段不等于给定阶段值或此屏障已终止,则立即返回。
      参数:
      phase - 到达阶段编号,如果已终止则为负值;此参数通常是前一次调用arrivearriveAndDeregister返回的值。
      返回:
      下一个到达阶段编号,如果参数为负值则为参数本身,如果已终止则为(负) 当前阶段
    • awaitAdvanceInterruptibly

      public int awaitAdvanceInterruptibly(int phase) throws InterruptedException
      等待此屏障的阶段从给定阶段值前进,如果当前阶段不等于给定阶段值或此屏障已终止,则抛出InterruptedException
      参数:
      phase - 到达阶段编号,如果已终止则为负值;此参数通常是前一次调用arrivearriveAndDeregister返回的值。
      返回:
      下一个到达阶段编号,如果参数为负值则为参数本身,如果已终止则为(负) 当前阶段
      抛出:
      InterruptedException - 如果在等待时线程被中断
    • awaitAdvanceInterruptibly

      public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
      等待此屏障的阶段从给定阶段值或给定超时时间前进,如果当前阶段不等于给定阶段值或此屏障已终止,则抛出InterruptedException
      参数:
      phase - 到达阶段编号,如果已终止则为负值;此参数通常是前一次调用arrivearriveAndDeregister返回的值。
      timeout - 等待超时时间,以unit为单位
      unit - 确定如何解释timeout参数的TimeUnit
      返回:
      下一个到达阶段编号,如果参数为负值则为参数本身,如果已终止则为(负) 当前阶段
      抛出:
      InterruptedException - 如果在等待时线程被中断
      TimeoutException - 如果等待超时
    • forceTermination

      public void forceTermination()
      强制此屏障进入终止状态。注册方数量不受影响。如果此屏障是分层屏障集的成员,则集合中的所有屏障都将终止。如果此屏障已终止,则此方法无效。此方法可能在一个或多个任务遇到意外异常后协调恢复时有用。
    • getPhase

      public final int getPhase()
      返回当前阶段编号。最大阶段编号为Integer.MAX_VALUE,之后会从零重新开始。在终止时,阶段编号为负值,此时可以通过getPhase() + Integer.MIN_VALUE获取终止前的主导阶段。
      返回:
      阶段编号,如果已终止则为负值
    • getRegisteredParties

      public int getRegisteredParties()
      返回在此屏障注册的方数量。
      返回:
      当事方的数量
    • getArrivedParties

      public int getArrivedParties()
      返回已经到达当前阶段的注册方的数量。如果这个屏障已经终止,返回的值是毫无意义和任意的。
      返回:
      已到达方的数量
    • getUnarrivedParties

      public int getUnarrivedParties()
      返回尚未到达当前阶段的注册方的数量。如果这个屏障已经终止,返回的值是毫无意义和任意的。
      返回:
      未到达方的数量
    • getParent

      public Phaser getParent()
      返回这个屏障的父屏障,如果没有则返回null
      返回:
      这个屏障的父屏障,如果没有则返回null
    • getRoot

      public Phaser getRoot()
      返回这个屏障的根祖先,如果没有父屏障则与这个屏障相同。
      返回:
      这个屏障的根祖先
    • isTerminated

      public boolean isTerminated()
      如果这个屏障已经终止,则返回true
      返回:
      如果这个屏障已经终止,则返回true
    • onAdvance

      protected boolean onAdvance(int phase, int registeredParties)
      可重写的方法,在即将发生阶段推进时执行操作,并控制终止。当推进这个屏障的方到达时(当所有其他等待方处于休眠状态时),会调用这个方法。如果这个方法返回true,则在推进时这个屏障将被设置为最终终止状态,并且后续调用isTerminated()将返回true。如果这个方法的调用引发(未检查的)异常或错误,则会传播到试图推进这个屏障的方,此时不会发生推进。

      这个方法的参数提供了当前转换中屏障的状态。在onAdvance中调用到的到达、注册和等待方法对这个屏障的影响是未指定的,不应依赖于它们。

      如果这个屏障是层级屏障集的成员,则onAdvance仅对其根屏障在每次推进时调用。

      为了支持最常见的用例,这个方法的默认实现在注册方的数量因某个方调用arriveAndDeregister而变为零时返回true。您可以通过重写这个方法始终返回false来禁用此行为,从而在未来的注册时继续:

       
       Phaser phaser = new Phaser() {
         protected boolean onAdvance(int phase, int parties) { return false; }
       };
      参数:
      phase - 进入此方法时的当前阶段编号,在此屏障推进之前
      registeredParties - 当前注册方的数量
      返回:
      如果这个屏障应该终止,则返回true
    • toString

      public String toString()
      返回一个标识此屏障及其状态的字符串。状态在括号中,包括字符串 "phase = "后跟阶段编号,"parties = "后跟注册方的数量,以及 "arrived = "后跟已到达方的数量。
      覆盖:
      toString 在类 Object
      返回:
      一个标识此屏障及其状态的字符串