Module java.base

Class SubmissionPublisher<T>

java.lang.Object
java.util.concurrent.SubmissionPublisher<T>
类型参数:
T - 发布的项目类型
所有实现的接口:
AutoCloseable, Flow.Publisher<T>

public class SubmissionPublisher<T> extends Object implements Flow.Publisher<T>, AutoCloseable
一个异步地向当前订阅者发布(非空)项目直到关闭的Flow.Publisher。每个当前订阅者按照相同顺序接收新提交的项目,除非遇到丢弃或异常。使用SubmissionPublisher允许项目生成器充当符合reactive-streams的发布者,依赖于丢弃处理和/或阻塞进行流控制。

SubmissionPublisher在其构造函数中使用提供的Executor来向订阅者传递。最佳的Executor选择取决于预期的使用情况。如果提交的项目的生成器在单独的线程中运行,并且订阅者的数量可以估计,则考虑使用Executors.newFixedThreadPool(int)。否则考虑使用默认值,通常是ForkJoinPool.commonPool()

缓冲允许生产者和消费者以不同的速率短暂操作。每个订阅者使用独立的缓冲区。缓冲区在首次使用时创建,并根据需要扩展到给定的最大值。(强制执行的容量可能会四舍五入到最接近的二的幂,并/或受到此实现支持的最大值的限制。)调用request不会直接导致缓冲区扩展,但如果未填充的请求超过最大容量,则会有饱和的风险。默认值Flow.defaultBufferSize()可能提供一个基于预期速率、资源和用法选择容量的有用起点。

单个SubmissionPublisher可以在多个源之间共享。在发布项目或向每个订阅者发出信号之前,源线程中的操作在每个订阅者的相应访问之后发生。但是,关于滞后和需求的报告估计设计用于监视,而不用于同步控制,并且可能反映进度的过时或不准确的视图。

发布方法支持关于缓冲区饱和时该做什么的不同策略。方法submit会阻塞,直到资源可用。这是最简单的,但响应最慢。offer方法可能会丢弃项目(立即或在有界超时内),但提供了一个机会来插入一个处理程序,然后重试。

如果任何订阅者方法抛出异常,则其订阅将被取消。如果在构造函数参数中提供了处理程序,则在方法onNext中发生异常之前,会在取消之前调用它,但在方法onSubscribeonErroronComplete中发生的异常不会被记录或处理。如果提供的Executor在尝试执行任务时抛出RejectedExecutionException(或任何其他RuntimeException或Error),或者在处理丢弃的项目时,丢弃处理程序抛出异常,则异常将被重新抛出。在这些情况下,不是所有订阅者都会收到发布的项目。通常最好在这些情况下closeExceptionally

方法consume(Consumer)简化了支持一个常见情况的订阅者的操作,即使用提供的函数请求和处理所有项目。

这个类也可以作为生成项目的子类的方便基类,并使用这个类中的方法来发布它们。例如,这里是一个定期发布从供应商生成的项目的类。(在实践中,您可能会添加方法来独立启动和停止生成,共享Executors在发布者之间,等等,或者使用SubmissionPublisher作为一个组件而不是一个超类。)

 
 class PeriodicPublisher<T> extends SubmissionPublisher<T> {
   final ScheduledFuture<?> periodicTask;
   final ScheduledExecutorService scheduler;
   PeriodicPublisher(Executor executor, int maxBufferCapacity,
                     Supplier<? extends T> supplier,
                     long period, TimeUnit unit) {
     super(executor, maxBufferCapacity);
     scheduler = new ScheduledThreadPoolExecutor(1);
     periodicTask = scheduler.scheduleAtFixedRate(
       () -> submit(supplier.get()), 0, period, unit);
   }
   public void close() {
     periodicTask.cancel(false);
     scheduler.shutdown();
     super.close();
   }
 }

这里是一个Flow.Processor实现的示例。它使用单步请求向其发布者简化说明。一个更自适应的版本可以使用从submit返回的滞后估计来监视流,以及其他实用方法。

 
 class TransformProcessor<S,T> extends SubmissionPublisher<T>
   implements Flow.Processor<S,T> {
   final Function<? super S, ? extends T> function;
   Flow.Subscription subscription;
   TransformProcessor(Executor executor, int maxBufferCapacity,
                      Function<? super S, ? extends T> function) {
     super(executor, maxBufferCapacity);
     this.function = function;
   }
   public void onSubscribe(Flow.Subscription subscription) {
     (this.subscription = subscription).request(1);
   }
   public void onNext(S item) {
     subscription.request(1);
     submit(function.apply(item));
   }
   public void onError(Throwable ex) { closeExceptionally(ex); }
   public void onComplete() { close(); }
 }
自 JDK 9 起:
9
  • Constructor Summary

    Constructors
    Constructor
    Description
    使用ForkJoinPool.commonPool()创建一个新的SubmissionPublisher,用于向订阅者异步传递(除非它不支持至少两个并行级别,此时将为每个任务创建一个新线程),最大缓冲容量为Flow.defaultBufferSize(),并且没有处理程序用于订阅者在方法onNext中抛出异常。
    SubmissionPublisher(Executor executor, int maxBufferCapacity)
    使用给定的Executor创建一个新的SubmissionPublisher,用于向订阅者异步传递,每个订阅者的最大缓冲区大小为给定值,并且没有处理程序用于订阅者在方法onNext中抛出异常。
    SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler)
    使用给定的Executor创建一个新的SubmissionPublisher,用于向订阅者异步传递,每个订阅者的最大缓冲区大小为给定值,并且如果非空,则在订阅者在方法onNext中抛出异常时调用给定的处理程序。
  • Method Summary

    Modifier and Type
    Method
    Description
    void
    close()
    除非已关闭,否则向当前订阅者发出onComplete信号,并禁止后续尝试发布。
    void
    除非已关闭,否则向当前订阅者发出带有给定错误的onError信号,并禁止后续尝试发布。
    consume(Consumer<? super T> consumer)
    使用给定的Consumer函数处理所有已发布的项目。
    int
    返回当前所有订阅者中已生产但尚未消费的项目的最大数量的估计。
    long
    返回当前所有订阅者中已请求(通过request)但尚未生产的项目的最小数量的估计。
    返回与closeExceptionally关联的异常,如果未关闭或正常关闭则返回null。
    返回用于异步传递的Executor。
    int
    返回每个订阅者的最大缓冲容量。
    int
    返回当前订阅者的数量。
    List<Flow.Subscriber<? super T>>
    返回当前订阅者的列表,用于监视和跟踪目的,而不是在订阅者上调用Flow.Subscriber方法。
    boolean
    如果此发布者有任何订阅者,则返回true。
    boolean
    如果此发布者不接受提交,则返回true。
    boolean
    isSubscribed(Flow.Subscriber<? super T> subscriber)
    如果给定的订阅者当前已订阅,则返回true。
    int
    offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
    将给定项目(如果可能)异步地发布给每个当前订阅者,通过调用其onNext方法,当任何订阅的资源不可用时阻塞,直到指定的超时或调用线程被中断,此时调用给定处理程序(如果非空),如果处理程序返回true,则重试一次。
    int
    offer(T item, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
    将给定项目(如果可能)异步地发布给每个当前订阅者,通过调用其onNext方法。
    int
    submit(T item)
    将给定项目异步地发布给每个当前订阅者,通过调用其onNext方法,在任何订阅者的资源不可用时无法中断地阻塞。
    void
    subscribe(Flow.Subscriber<? super T> subscriber)
    添加给定的订阅者,除非已经订阅。

    Methods declared in class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
  • Constructor Details

    • SubmissionPublisher

      public SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler)
      使用给定的Executor创建一个新的SubmissionPublisher,用于向订阅者异步传递,每个订阅者的最大缓冲区大小为给定值,并且如果非空,则在订阅者在方法onNext中抛出异常时调用给定的处理程序。
      参数:
      executor - 用于异步传递的执行程序,支持至少创建一个独立线程
      maxBufferCapacity - 每个订阅者缓冲区的最大容量(强制容量可能会四舍五入到最接近的2的幂,并/或受此实现支持的最大值限制;方法getMaxBufferCapacity()返回实际值)
      handler - 如果非空,在onNext方法中抛出异常时要调用的过程
      抛出:
      NullPointerException - 如果执行程序为null
      IllegalArgumentException - 如果maxBufferCapacity不是正数
    • SubmissionPublisher

      public SubmissionPublisher(Executor executor, int maxBufferCapacity)
      使用给定的执行程序创建一个新的SubmissionPublisher,用于向订阅者异步传递,每个订阅者的最大缓冲区大小为给定值,并且在onNext方法中没有订阅者异常处理程序。
      参数:
      executor - 用于异步传递的执行程序,支持至少创建一个独立线程
      maxBufferCapacity - 每个订阅者缓冲区的最大容量(强制容量可能会四舍五入到最接近的2的幂,并/或受此实现支持的最大值限制;方法getMaxBufferCapacity()返回实际值)
      抛出:
      NullPointerException - 如果执行程序为null
      IllegalArgumentException - 如果maxBufferCapacity不是正数
    • SubmissionPublisher

      public SubmissionPublisher()
      使用ForkJoinPool.commonPool()创建一个新的SubmissionPublisher,用于向订阅者异步传递(除非它不支持至少两个并行级别,在这种情况下,将为每个任务创建一个新线程),每个订阅者的最大缓冲区容量为Flow.defaultBufferSize(),在onNext方法中没有订阅者异常处理程序。
  • Method Details

    • subscribe

      public void subscribe(Flow.Subscriber<? super T> subscriber)
      添加给定的订阅者,除非已经订阅。如果已经订阅,则在现有订阅上调用订阅者的onError方法,并使用IllegalStateException。否则,成功时,异步调用订阅者的onSubscribe方法,并使用新的Flow.Subscription。如果onSubscribe抛出异常,则取消订阅。否则,如果此SubmissionPublisher异常关闭,则调用订阅者的onError方法,并使用相应的异常,或者如果没有异常关闭,则调用订阅者的onComplete方法。订阅者可以通过调用新Subscription的request方法启用接收项目,并可以通过调用其cancel方法取消订阅。
      指定者:
      subscribe 在接口 Flow.Publisher<T>
      参数:
      subscriber - 订阅者
      抛出:
      NullPointerException - 如果订阅者为null
    • submit

      public int submit(T item)
      通过异步调用其onNext方法向每个当前订阅者发布给定项目,当任何订阅者的资源不可用时会阻塞。此方法返回所有当前订阅者中最大滞后(已提交但尚未消耗的项目数量)的估计值。如果有任何订阅者,则此值至少为一(考虑到此提交的项目),否则为零。

      如果此发布者的执行程序在尝试异步通知订阅者时抛出RejectedExecutionException(或任何其他RuntimeException或Error),则会重新抛出此异常,此时并非所有订阅者都已收到此项目。

      参数:
      item - 要发布的(非null)项目
      返回:
      订阅者中的最大滞后的估计值
      抛出:
      IllegalStateException - 如果已关闭
      NullPointerException - 如果项目为null
      RejectedExecutionException - 如果执行程序抛出
    • offer

      public int offer(T item, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
      如果可能,通过异步调用其onNext方法向每个当前订阅者发布给定项目。如果超出资源限制,一个或多个订阅者可能会丢弃项目,此时如果给定的处理程序(如果非null)返回true,则会重试一次。在处理程序调用时,其他线程对此类中的其他方法的调用将被阻塞。除非恢复得到保证,通常的选项仅限于记录错误和/或向订阅者发出onError信号。

      此方法返回一个状态指示器:如果为负数,则表示丢弃的数量(向订阅者发出项目的尝试失败次数为负数)。否则,它是所有当前订阅者中最大滞后的估计值。如果有任何订阅者,则此值至少为一(考虑到此提交的项目),否则为零。

      如果此发布者的执行程序在尝试异步通知订阅者时抛出RejectedExecutionException(或任何其他RuntimeException或Error),或者在处理丢弃项目时丢出丢弃处理程序,则会重新抛出此异常。

      参数:
      item - 要发布的(非null)项目
      onDrop - 如果非null,当向订阅者丢弃项目时调用的处理程序,参数为订阅者和项目;如果返回true,则重新尝试一次
      返回:
      如果为负数,则为(负数)丢弃的数量;否则为最大滞后的估计值
      抛出:
      IllegalStateException - 如果已关闭
      NullPointerException - 如果项目为null
      RejectedExecutionException - 如果执行程序抛出
    • offer

      public int offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
      如果尚未关闭,则向当前订阅者发出onComplete信号,并禁止后续尝试发布。返回时,此方法保证所有订阅者已经完成。
      Parameters:
      item - the (non-null) item to publish
      timeout - how long to wait for resources for any subscriber before giving up, in units of unit
      unit - a TimeUnit determining how to interpret the timeout parameter
      onDrop - if non-null, the handler invoked upon a drop to a subscriber, with arguments of the subscriber and item; if it returns true, an offer is re-attempted (once)
      Returns:
      if negative, the (negative) number of drops; otherwise an estimate of maximum lag
      Throws:
      IllegalStateException - if closed
      NullPointerException - if item is null
      RejectedExecutionException - if thrown by Executor
    • close

      public void close()
      Unless already closed, issues onComplete signals to current subscribers, and disallows subsequent attempts to publish. Upon return, this method does NOT guarantee that all subscribers have yet completed.
      指定者:
      close 在接口 AutoCloseable
    • closeExceptionally

      public void closeExceptionally(Throwable error)
      除非已关闭,否则将错误传递给当前订阅者,并禁止后续尝试发布。未来的订阅者也会收到给定的错误。在返回时,此方法不保证所有订阅者都已完成。
      参数:
      error - 发送给订阅者的onError参数
      抛出:
      NullPointerException - 如果错误为null
    • isClosed

      public boolean isClosed()
      如果此发布者不接受提交,则返回true。
      返回:
      如果已关闭,则为true
    • getClosedException

      public Throwable getClosedException()
      返回与closeExceptionally关联的异常,如果未关闭或正常关闭,则返回null。
      返回:
      异常,如果没有则返回null
    • hasSubscribers

      public boolean hasSubscribers()
      如果此发布者有任何订阅者,则返回true。
      返回:
      如果此发布者有任何订阅者,则为true
    • getNumberOfSubscribers

      public int getNumberOfSubscribers()
      返回当前订阅者的数量。
      返回:
      当前订阅者的数量
    • getExecutor

      public Executor getExecutor()
      返回用于异步传递的Executor。
      返回:
      用于异步传递的Executor
    • getMaxBufferCapacity

      public int getMaxBufferCapacity()
      返回每个订阅者的最大缓冲容量。
      返回:
      每个订阅者的最大缓冲容量
    • getSubscribers

      public List<Flow.Subscriber<? super T>> getSubscribers()
      返回当前订阅者的列表,用于监视和跟踪目的,而不是在订阅者上调用Flow.Subscriber方法。
      返回:
      当前订阅者的列表
    • isSubscribed

      public boolean isSubscribed(Flow.Subscriber<? super T> subscriber)
      返回给定订阅者当前是否已订阅。
      参数:
      subscriber - 订阅者
      返回:
      如果当前已订阅,则为true
      抛出:
      NullPointerException - 如果订阅者为null
    • estimateMinimumDemand

      public long estimateMinimumDemand()
      返回请求的最小项目数量的估计(通过request),但尚未生成,所有当前订阅者中。
      返回:
      估计值,如果没有订阅者则为零
    • estimateMaximumLag

      public int estimateMaximumLag()
      返回已生产但尚未被所有当前订阅者消耗的项目的最大数量的估计。
      返回:
      估计值
    • consume

      public CompletableFuture<Void> consume(Consumer<? super T> consumer)
      使用给定的Consumer函数处理所有已发布的项目。返回一个CompletableFuture,当此发布者发出onComplete信号时,将正常完成,或在任何错误时异常完成,或由Consumer抛出异常,或返回的CompletableFuture被取消时,此时不会处理更多项目。
      参数:
      consumer - 应用于每个onNext项目的函数
      返回:
      一个CompletableFuture,当发布者发出onComplete信号时,将正常完成,当出现任何错误或取消时将异常完成
      抛出:
      NullPointerException - 如果consumer为null