Module java.base

Class StructuredTaskScope<T>

java.lang.Object
java.util.concurrent.StructuredTaskScope<T>
类型参数:
T - 在任务范围内执行的任务的结果类型
所有已实现的接口:
AutoCloseable
直接已知的子类:
StructuredTaskScope.ShutdownOnFailure预览, StructuredTaskScope.ShutdownOnSuccess预览

public class StructuredTaskScope<T> extends Object implements AutoCloseable
StructuredTaskScope 是 Java 平台的预览 API。
仅当启用预览功能时,程序才能使用 StructuredTaskScope
预览功能可能会在将来的版本中被移除,或升级为 Java 平台的永久功能。
一个用于结构化并发的基本API。 StructuredTaskScope支持任务分裂为多个并发子任务,并且子任务必须在主任务继续之前完成。 StructuredTaskScope可用于确保并发操作的生命周期受到语法块的限制,就像结构化编程中的顺序操作一样。

基本操作

使用其公共构造函数之一创建 StructuredTaskScope。它定义了fork方法来启动一个线程来执行子任务,join方法等待所有子任务完成,close方法关闭任务范围。该API旨在与 try-with-resources语句一起使用。意图是try块中的代码使用fork方法来分叉线程以执行子任务,使用join方法等待子任务完成,然后处理结果。调用fork方法返回一个Subtask预览来表示分叉的子任务。一旦调用join,就可以使用Subtask来获取成功完成的结果,或者如果子任务失败则获取异常。
    Callable<String> task1 = ...
    Callable<Integer> task2 = ...

    try (var scope = new StructuredTaskScope<Object>()) {

        Subtask<String> subtask1 = scope.fork(task1);
        Subtask<Integer> subtask2 = scope.fork(task2);

        scope.join();

        ... 处理结果/异常 ...

    } // close

以下示例分叉了一组同类子任务,使用join方法等待所有子任务完成,并使用Subtask.State预览将子任务分区为成功完成的子任务集合和失败的子任务集合。

    List<Callable<String>> callables = ...

    try (var scope = new StructuredTaskScope<String>()) {

        List<Subtask<String>> subtasks = callables.stream().map(scope::fork).toList();

        scope.join();

        Map<Boolean, Set<Subtask<String>>> map = subtasks.stream()
                .collect(Collectors.partitioningBy(h -> h.state() == Subtask.State.SUCCESS,
                                                   Collectors.toSet()));

    } // close
joinclose方法只能由 所有者(打开/创建任务范围的线程)调用,如果所有者在分叉后未调用 join方法,则 close方法在关闭后会抛出异常。

StructuredTaskScope定义了shutdown方法来关闭任务范围而不关闭它。 shutdown()方法通过中断线程来取消所有未完成的子任务。它阻止在任务范围内启动新线程。如果所有者在join方法中等待,则会唤醒。

短路并允许子类实现不需要所有子任务完成的 策略

具有常见情况策略的子类

定义了两个StructuredTaskScope的子类来实现常见情况的策略:
  1. ShutdownOnSuccess预览捕获第一个成功完成的子任务的结果。一旦捕获,它会关闭任务范围以中断未完成的线程并唤醒所有者。此类适用于任何子任务的结果都可以的情况("调用任何"),并且不需要等待其他未完成子任务的结果。它定义了获取第一个结果或如果所有子任务失败则抛出异常的方法。
  2. ShutdownOnFailure预览捕获第一个失败的子任务的异常。一旦捕获,它会关闭任务范围以中断未完成的线程并唤醒所有者。此类适用于需要所有子任务的结果的情况("调用所有");如果任何子任务失败,则不再需要其他未完成子任务的结果。如果定义了如果任何子任务失败则抛出异常的方法。
join中等待,直到任一子任务完成并返回结果或两个子任务都失败。它调用 result(Function) 预览方法来获取捕获的结果。如果两个子任务都失败,则此方法将抛出一个 WebApplicationException,其中包含其中一个子任务的异常作为原因。

    try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {

        scope.fork(() -> fetch(left));
        scope.fork(() -> fetch(right));

        scope.join();

        String result = scope.result(e -> new WebApplicationException(e));

        ...
    }
第二个示例创建一个ShutdownOnFailure对象来捕获第一个失败的子任务的异常,通过关闭任务范围取消另一个。主任务在 joinUntil(Instant) 中等待,直到两个子任务都完成并返回结果,任一失败,或达到截止日期。它调用 throwIfFailed(Function) 预览 如果任一子任务失败则抛出异常。如果两个子任务都成功完成,则此方法不执行任何操作。该示例使用 Supplier.get() 来获取每个子任务的结果。在常见情况下,使用 Supplier 而不是 Subtask 更为合适,因为由fork返回的对象仅用于获取成功完成的子任务的结果。
   Instant deadline = ...

   try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

        Supplier<String> supplier1 = scope.fork(() -> query(left));
        Supplier<String> supplier2 = scope.fork(() -> query(right));

        scope.joinUntil(deadline);

        scope.throwIfFailed(e -> new WebApplicationException(e));

        // 两个子任务都成功完成
        String result = Stream.of(supplier1, supplier2)
                .map(Supplier::get)
                .collect(Collectors.joining(", ", "{ ", " }"));

        ...
    }

扩展StructuredTaskScope

StructuredTaskScope 可以被扩展,并且可以重写 handleComplete 方法,以实现除 ShutdownOnSuccess ShutdownOnFailure 之外的策略。例如,子类可以收集成功完成的子任务的结果并忽略失败的子任务。当子任务失败时,它可以收集异常。它可以调用 shutdown 方法来关闭并在出现某种条件时唤醒 join join方法之后执行的代码中可用。收集结果并忽略失败的子任务的子类可能会定义一个返回结果的方法。实现在子任务失败时关闭的策略的子类可能会定义一个获取第一个失败的子任务的异常的方法。

StructuredTaskScope实现示例,用于收集成功完成的同类子任务。它定义了主任务在加入后可以调用的" completedSuccessfully()"方法。

    class CollectingScope<T> extends StructuredTaskScope<T> {
        private final Queue<Subtask<? extends T>> subtasks = new LinkedTransferQueue<>();

        @Override
        protected void handleComplete(Subtask<? extends T> subtask) {
            if (subtask.state() == Subtask.State.SUCCESS) {
                subtasks.add(subtask);
            }
        }

        @Override
        public CollectingScope<T> join() throws InterruptedException {
            super.join();
            return this;
        }

        public Stream<Subtask<? extends T>> completedSuccessfully() {
            super.ensureOwnerAndJoined();
            return subtasks.stream();
        }
    }

在示例中,completedSuccessfully() 方法的实现调用 ensureOwnerAndJoined() 来确保该方法只能被所有者线程调用,并且只能在加入后调用。

树结构

任务范围形成一个树形结构,当打开新的任务范围时,父子关系会隐式建立:
  • 当在任务范围中启动的线程打开自己的任务范围时,会建立父子关系。在任务范围“A”中启动的线程打开任务范围“B”时,建立父子关系,其中任务范围“A”是任务范围“B”的父级。
  • 通过嵌套建立父子关系。如果一个线程打开任务范围“B”,然后打开任务范围“C”(在关闭“B”之前),则封闭任务范围“B”是嵌套任务范围“C”的父级。
任务范围的 后代 是其父级任务范围,以及子任务范围的后代,递归地。

树结构支持:

  • 跨线程继承 scoped values预览
  • 封闭检查。方法描述中的短语“包含在任务范围中的线程”表示在任务范围或后代范围中启动的线程。

以下示例演示了作用域值的继承。一个作用域值 USERNAME 绑定到值 "duke"。创建一个 StructuredTaskScope 并调用其 fork 方法来启动一个线程执行 childTask。线程继承了创建任务范围时捕获的作用域值 bindings。在 childTask 中的代码使用作用域值的值,因此读取值 "duke"。

    private static final ScopedValue<String> USERNAME = ScopedValue.newInstance();

    ScopedValue.runWhere(USERNAME, "duke", () -> {
        try (var scope = new StructuredTaskScope<String>()) {

            scope.fork(() -> childTask());
            ...
         }
    });

    ...

    String childTask() {
        String name = USERNAME.get();   // "duke"
        ...
    }

StructuredTaskScope 目前不定义公开树结构的 API。

除非另有说明,在此类中向构造函数或方法传递 null 参数将导致抛出 NullPointerException

内存一致性效果

在所有者线程中的操作,或者在任务范围中包含的线程,在 fork 子任务之前的操作 先行发生于该子任务采取的任何操作,这又 先行发生 于子任务结果被 检索预览 或者在 加入 任务范围后的线程中采取的任何操作。

参见 Java 语言规范:
17.4.5 Happens-before Order
自 JDK 版本:
21
  • Constructor Details

    • StructuredTaskScope

      public StructuredTaskScope(String name, ThreadFactory factory)
      使用给定的名称和线程工厂创建一个结构化任务范围。任务范围可选地用于监控和管理。当分叉子任务时,线程工厂用于 创建 线程。任务范围由当前线程拥有。

      构造函数捕获当前线程的 scoped value预览 绑定,以便线程在任务范围中启动时继承这些绑定。类描述中的 树结构 部分详细说明了为了继承 scoped value 绑定而隐式建立的父子关系。

      参数:
      name - 任务范围的名称,可以为 null
      factory - 线程工厂
    • StructuredTaskScope

      public StructuredTaskScope()
      创建一个创建虚拟线程的未命名结构化任务范围。任务范围由当前线程拥有。
      实现要求:
      此构造函数等效于使用名称为 null 和创建虚拟线程的线程工厂调用 2-参数构造函数。
  • Method Details

    • ensureOwnerAndJoined

      protected final void ensureOwnerAndJoined()
      确保当前线程是此任务范围的所有者,并且在 join()joinUntil(Instant) 加入后使用 分叉 子任务。
      API 注释:
      此方法可供定义方法的子类使用,以向打算在 join 方法之后执行的代码提供结果、状态或其他结果。
      抛出:
      WrongThreadException - 如果当前线程不是任务范围所有者
      IllegalStateException - 如果任务范围处于打开状态且任务范围所有者在分叉后未加入
    • handleComplete

      protected void handleComplete(StructuredTaskScope.SubtaskPREVIEW<? extends T> subtask)
      当子任务在此任务范围中成功完成或失败时由子任务调用。如果子任务在任务范围 关闭 后完成,则不会调用此方法。
      API 注释:
      handleComplete 方法应该是线程安全的。可能会同时由多个线程调用。
      实现要求:
      默认实现如果子任务为 null 则抛出 NullPointerException。如果子任务尚未完成,则抛出 IllegalArgumentException
      参数:
      subtask - 子任务
      抛出:
      IllegalArgumentException - 如果使用尚未完成的子任务调用
    • fork

      public <U extends T> StructuredTaskScope.SubtaskPREVIEW<U> fork(Callable<? extends U> task)
      在此任务范围中启动一个新线程以执行返回值任务,从而创建此任务范围的 子任务

      返回值任务作为 Callable 提供给此方法,线程执行任务的 call 方法。线程使用任务范围的 ThreadFactory 创建。它继承当前线程的 scoped value预览 绑定。这些绑定必须与创建任务范围时捕获的绑定相匹配。

      此方法返回一个 Subtask预览 以表示 分叉的子任务。当子任务成功完成时,可以使用 Subtask 对象获取结果,或者当子任务失败时获取异常。为确保正确使用,只有任务范围所有者在等待所有线程完成后才能调用 get()预览exception()预览 方法。当子任务完成时,线程调用 handleComplete 方法来消费已完成的子任务。如果任务范围在子任务完成之前被 关闭,则不会调用 handleComplete 方法。

      如果此任务范围已被 关闭(或正在关闭过程中),则子任务将不会运行,handleComplete 方法也不会被调用。

      此方法只能由任务范围所有者或包含在任务范围中的线程调用。

      实现要求:
      此方法可被重写以进行定制目的,例如包装任务。如果重写,子类必须调用 super.fork 来在此任务范围中启动一个新线程。
      类型参数:
      U - 结果类型
      参数:
      task - 线程要执行的返回值任务
      返回:
      子任务
      抛出:
      IllegalStateException - 如果此任务范围已关闭
      WrongThreadException - 如果当前线程不是任务范围所有者或包含在任务范围中的线程
      StructureViolationException预览 - 如果当前 scoped value 绑定与创建任务范围时不同
      RejectedExecutionException - 如果线程工厂拒绝创建线程来运行子任务
    • join

      等待此任务范围中所有子任务开始并完成,或任务范围关闭。

      此方法通过等待此任务范围中所有线程启动来等待所有子任务完成执行。当所有线程完成、任务范围被关闭或当前线程被中断时,停止等待。

      此方法只能由任务范围所有者调用。

      实现要求:
      可以重写此方法以进行定制目的或返回更具体的返回类型。如果重写,则子类必须调用super.join以确保方法等待此任务范围中的线程完成。
      返回:
      此任务范围
      抛出:
      IllegalStateException - 如果此任务范围已关闭
      WrongThreadException - 如果当前线程不是任务范围所有者
      InterruptedException - 在等待时被中断
    • joinUntil

      等待此任务范围中所有子任务开始并完成,或任务范围在给定截止时间之前关闭。

      此方法通过等待此任务范围中所有线程启动来等待所有子任务完成执行。当所有线程完成、任务范围被关闭、达到截止时间或当前线程被中断时,停止等待。

      此方法只能由任务范围所有者调用。

      实现要求:
      可以重写此方法以进行定制目的或返回更具体的返回类型。如果重写,则子类必须调用super.joinUntil以确保方法等待此任务范围中的线程完成。
      参数:
      deadline - 截止时间
      返回:
      此任务范围
      抛出:
      IllegalStateException - 如果此任务范围已关闭
      WrongThreadException - 如果当前线程不是任务范围所有者
      InterruptedException - 在等待时被中断
      TimeoutException - 如果等待时达到截止时间
    • shutdown

      public void shutdown()
      关闭此任务范围而不关闭它。关闭任务范围可防止新线程启动,中断所有未完成的线程,并导致join方法唤醒。关闭对于不再需要未完成子任务结果的情况很有用。通常会由实现了一项策略以在达到某种结果后丢弃未完成任务的子类的handleComplete(Subtask)实现调用。

      更具体地说,此方法:

      • 中断任务范围中所有未完成的线程(除了当前线程)。
      • 如果任务范围所有者正在join()joinUntil(Instant)中等待,则唤醒。如果任务范围所有者未在等待,则其下一次调用joinjoinUntil将立即返回。

      此方法只能由任务范围所有者或包含在任务范围中的线程调用。

      API注释:
      可能有一些线程尚未完成,因为它们正在执行未响应(或未及时响应)线程中断的代码。此方法不会等待这些线程。当所有者调用close()方法关闭任务范围时,它将等待剩余线程完成。
      实现要求:
      可以重写此方法以进行定制目的。如果重写,则子类必须调用super.shutdown以确保方法关闭任务范围。
      抛出:
      IllegalStateException - 如果此任务范围已关闭
      WrongThreadException - 如果当前线程不是任务范围所有者或包含在任务范围中的线程
      参见:
    • isShutdown

      public final boolean isShutdown()
      如果此任务范围已关闭,则返回true,否则返回false。
      返回:
      如果此任务范围已关闭,则返回true,否则返回false
      参见:
    • close

      public void close()
      关闭此任务范围。

      此方法首先关闭任务范围(就像调用shutdown方法一样)。然后等待执行任何未完成任务的线程。如果被中断,此方法将继续等待线程完成,然后以设置的中断状态完成。

      此方法只能由任务范围所有者调用。如果任务范围已关闭,则调用此方法的任务范围所有者不会产生任何效果。

      StructuredTaskScope旨在以结构化方式使用。如果调用此方法来关闭任务范围,而嵌套任务范围尚未关闭,则会关闭每个嵌套任务范围的基础构造(按创建顺序的逆序),关闭此任务范围,然后抛出StructureViolationExceptionPREVIEW。类似地,如果在绑定scoped valuePREVIEW的情况下调用此方法来关闭任务范围,并且任务范围是在绑定作用域值之前创建的,则在关闭任务范围后会抛出StructureViolationException。如果线程在关闭拥有的任务范围之前终止,则终止将导致关闭其所有打开任务范围的基础构造。关闭按照创建任务范围的逆序执行。因此,当任务范围所有者必须等待在这些任务范围中分叉的线程完成时,线程终止可能会延迟。

      指定者:
      close 在接口 AutoCloseable
      实现要求:
      可以重写此方法以进行定制目的。如果重写,则子类必须调用super.close以关闭任务范围。
      抛出:
      IllegalStateException - 在关闭任务范围后,如果任务范围所有者未尝试在分叉后加入,则抛出
      WrongThreadException - 如果当前线程不是任务范围所有者
      StructureViolationExceptionPREVIEW - 如果检测到结构违规