Publishers
生成由一个或多个Subscribers
消费的项目,每个项目由一个Subscription
管理。
这些接口对应于reactive-streams规范。它们适用于并发和分布式异步设置:所有(七个)方法都以void
的“单向”消息样式定义。通信依赖于一种简单的流控制形式(方法Flow.Subscription.request(long)
),可用于避免在“推”式系统中可能发生的资源管理问题。
示例。 一个Flow.Publisher
通常定义自己的Flow.Subscription
实现;在subscribe
方法中构造一个并将其分配给调用的Flow.Subscriber
。它异步地向订阅者发布项目,通常使用一个Executor
。例如,这里是一个非常简单的发布者,只在请求时发布一个TRUE
项目给单个订阅者。因为订阅者只接收一个项目,所以这个类不使用大多数实现中所需的缓冲和排序控制(例如SubmissionPublisher
)。
class OneShotPublisher implements Publisher<Boolean> {
private final ExecutorService executor = ForkJoinPool.commonPool(); // 基于守护线程
private boolean subscribed; // 第一次订阅后为true
public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
if (subscribed)
subscriber.onError(new IllegalStateException()); // 只允许一个
else {
subscribed = true;
subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
}
}
static class OneShotSubscription implements Subscription {
private final Subscriber<? super Boolean> subscriber;
private final ExecutorService executor;
private Future<?> future; // 用于取消
private boolean completed;
OneShotSubscription(Subscriber<? super Boolean> subscriber,
ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
}
public synchronized void request(long n) {
if (!completed) {
completed = true;
if (n <= 0) {
IllegalArgumentException ex = new IllegalArgumentException();
executor.execute(() -> subscriber.onError(ex));
} else {
future = executor.submit(() -> {
subscriber.onNext(Boolean.TRUE);
subscriber.onComplete();
});
}
}
}
public synchronized void cancel() {
completed = true;
if (future != null) future.cancel(false);
}
}
}
一个Flow.Subscriber
安排请求和处理项目。项目(调用Flow.Subscriber.onNext(T)
)只有在请求时才会发布,但可以请求多个项目。许多Subscriber实现可以按照以下示例的风格安排这一点,其中缓冲区大小为1,较大的大小通常允许更有效的重叠处理和更少的通信;例如,使用值64,这将保持32到64之间的总未完成请求。因为给定Flow.Subscription
的Subscriber方法调用是严格有序的,所以这些方法无需使用锁或volatile,除非Subscriber维护多个Subscriptions(在这种情况下,最好定义多个Subscribers,每个都有自己的Subscription)。
class SampleSubscriber<T> implements Subscriber<T> {
final Consumer<? super T> consumer;
Subscription subscription;
final long bufferSize;
long count;
SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
this.bufferSize = bufferSize;
this.consumer = consumer;
}
public void onSubscribe(Subscription subscription) {
long initialRequestSize = bufferSize;
count = bufferSize - bufferSize / 2; // 当一半被消耗时重新请求
(this.subscription = subscription).request(initialRequestSize);
}
public void onNext(T item) {
if (--count <= 0)
subscription.request(count = bufferSize - bufferSize / 2);
consumer.accept(item);
}
public void onError(Throwable ex) { ex.printStackTrace(); }
public void onComplete() {}
}
defaultBufferSize()
的默认值可能为基于预期速率、资源和用法选择Flow组件中的请求大小和容量提供一个有用的起点。或者,当永远不需要流控制时,订阅者可以最初请求有效无限数量的项目,如下所示:
class UnboundedSubscriber<T> implements Subscriber<T> {
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE); // 有效无限
}
public void onNext(T item) { use(item); }
public void onError(Throwable ex) { ex.printStackTrace(); }
public void onComplete() {}
void use(T item) { ... }
}
- 自版本:
- 9
-
Nested Class Summary
Modifier and TypeClassDescriptionstatic interface
作为Subscriber和Publisher的组件。static interface
项目(及相关控制消息)的生产者,由订阅者接收。static interface
消息的接收者。static interface
连接Flow.Publisher
和Flow.Subscriber
的消息控制。 -
Method Summary
Modifier and TypeMethodDescriptionstatic int
返回Publisher或Subscriber缓冲的默认值,可在没有其他约束的情况下使用。
-
Method Details
-
defaultBufferSize
public static int defaultBufferSize()返回Publisher或Subscriber缓冲的默认值,可在没有其他约束的情况下使用。- 实现注意:
- 当前返回的值为256。
- 返回:
- 缓冲区大小值
-