Module java.base

Class Flow

java.lang.Object
java.util.concurrent.Flow

public final class Flow extends Object
用于建立流控制组件的相关接口和静态方法,其中Publishers生成由一个或多个Subscribers消费的项目,每个项目由一个Subscription管理。

这些接口对应于reactive-streams规范。它们适用于并发和分布式异步设置:所有(七个)方法都以void的“单向”消息样式定义。通信依赖于一种简单的流控制形式(方法Flow.Subscription.request(long)),可用于避免在“推”式系统中可能发生的资源管理问题。

示例。 一个Flow.Publisher通常定义自己的Flow.Subscription实现;在subscribe方法中构造一个并将其分配给调用的Flow.Subscriber。它异步地向订阅者发布项目,通常使用一个Executor。例如,这里是一个非常简单的发布者,只在请求时发布一个TRUE项目给单个订阅者。因为订阅者只接收一个项目,所以这个类不使用大多数实现中所需的缓冲和排序控制(例如SubmissionPublisher)。

 
 class OneShotPublisher implements Publisher<Boolean> {
   private final ExecutorService executor = ForkJoinPool.commonPool(); // 基于守护线程
   private boolean subscribed; // 第一次订阅后为true
   public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
     if (subscribed)
       subscriber.onError(new IllegalStateException()); // 只允许一个
     else {
       subscribed = true;
       subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
     }
   }
   static class OneShotSubscription implements Subscription {
     private final Subscriber<? super Boolean> subscriber;
     private final ExecutorService executor;
     private Future<?> future; // 用于取消
     private boolean completed;
     OneShotSubscription(Subscriber<? super Boolean> subscriber,
                         ExecutorService executor) {
       this.subscriber = subscriber;
       this.executor = executor;
     }
     public synchronized void request(long n) {
       if (!completed) {
         completed = true;
         if (n <= 0) {
           IllegalArgumentException ex = new IllegalArgumentException();
           executor.execute(() -> subscriber.onError(ex));
         } else {
           future = executor.submit(() -> {
             subscriber.onNext(Boolean.TRUE);
             subscriber.onComplete();
           });
         }
       }
     }
     public synchronized void cancel() {
       completed = true;
       if (future != null) future.cancel(false);
     }
   }
 }

一个Flow.Subscriber安排请求和处理项目。项目(调用Flow.Subscriber.onNext(T))只有在请求时才会发布,但可以请求多个项目。许多Subscriber实现可以按照以下示例的风格安排这一点,其中缓冲区大小为1,较大的大小通常允许更有效的重叠处理和更少的通信;例如,使用值64,这将保持32到64之间的总未完成请求。因为给定Flow.Subscription的Subscriber方法调用是严格有序的,所以这些方法无需使用锁或volatile,除非Subscriber维护多个Subscriptions(在这种情况下,最好定义多个Subscribers,每个都有自己的Subscription)。

 
 class SampleSubscriber<T> implements Subscriber<T> {
   final Consumer<? super T> consumer;
   Subscription subscription;
   final long bufferSize;
   long count;
   SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
     this.bufferSize = bufferSize;
     this.consumer = consumer;
   }
   public void onSubscribe(Subscription subscription) {
     long initialRequestSize = bufferSize;
     count = bufferSize - bufferSize / 2; // 当一半被消耗时重新请求
     (this.subscription = subscription).request(initialRequestSize);
   }
   public void onNext(T item) {
     if (--count <= 0)
       subscription.request(count = bufferSize - bufferSize / 2);
     consumer.accept(item);
   }
   public void onError(Throwable ex) { ex.printStackTrace(); }
   public void onComplete() {}
 }

defaultBufferSize()的默认值可能为基于预期速率、资源和用法选择Flow组件中的请求大小和容量提供一个有用的起点。或者,当永远不需要流控制时,订阅者可以最初请求有效无限数量的项目,如下所示:

 
 class UnboundedSubscriber<T> implements Subscriber<T> {
   public void onSubscribe(Subscription subscription) {
     subscription.request(Long.MAX_VALUE); // 有效无限
   }
   public void onNext(T item) { use(item); }
   public void onError(Throwable ex) { ex.printStackTrace(); }
   public void onComplete() {}
   void use(T item) { ... }
 }
自版本:
9
  • Method Details

    • defaultBufferSize

      public static int defaultBufferSize()
      返回Publisher或Subscriber缓冲的默认值,可在没有其他约束的情况下使用。
      实现注意:
      当前返回的值为256。
      返回:
      缓冲区大小值