- 类型参数:
-
T
- 发布的项目类型
- 所有实现的接口:
-
AutoCloseable
,Flow.Publisher<T>
Flow.Publisher
。每个当前订阅者按照相同顺序接收新提交的项目,除非遇到丢弃或异常。使用SubmissionPublisher允许项目生成器充当符合reactive-streams的发布者,依赖于丢弃处理和/或阻塞进行流控制。
SubmissionPublisher在其构造函数中使用提供的Executor
来向订阅者传递。最佳的Executor选择取决于预期的使用情况。如果提交的项目的生成器在单独的线程中运行,并且订阅者的数量可以估计,则考虑使用Executors.newFixedThreadPool(int)
。否则考虑使用默认值,通常是ForkJoinPool.commonPool()
。
缓冲允许生产者和消费者以不同的速率短暂操作。每个订阅者使用独立的缓冲区。缓冲区在首次使用时创建,并根据需要扩展到给定的最大值。(强制执行的容量可能会四舍五入到最接近的二的幂,并/或受到此实现支持的最大值的限制。)调用request
不会直接导致缓冲区扩展,但如果未填充的请求超过最大容量,则会有饱和的风险。默认值Flow.defaultBufferSize()
可能提供一个基于预期速率、资源和用法选择容量的有用起点。
单个SubmissionPublisher可以在多个源之间共享。在发布项目或向每个订阅者发出信号之前,源线程中的操作在每个订阅者的相应访问之后发生。但是,关于滞后和需求的报告估计设计用于监视,而不用于同步控制,并且可能反映进度的过时或不准确的视图。
发布方法支持关于缓冲区饱和时该做什么的不同策略。方法submit
会阻塞,直到资源可用。这是最简单的,但响应最慢。offer
方法可能会丢弃项目(立即或在有界超时内),但提供了一个机会来插入一个处理程序,然后重试。
如果任何订阅者方法抛出异常,则其订阅将被取消。如果在构造函数参数中提供了处理程序,则在方法onNext
中发生异常之前,会在取消之前调用它,但在方法onSubscribe
、onError
和onComplete
中发生的异常不会被记录或处理。如果提供的Executor在尝试执行任务时抛出RejectedExecutionException
(或任何其他RuntimeException或Error),或者在处理丢弃的项目时,丢弃处理程序抛出异常,则异常将被重新抛出。在这些情况下,不是所有订阅者都会收到发布的项目。通常最好在这些情况下closeExceptionally
。
方法consume(Consumer)
简化了支持一个常见情况的订阅者的操作,即使用提供的函数请求和处理所有项目。
这个类也可以作为生成项目的子类的方便基类,并使用这个类中的方法来发布它们。例如,这里是一个定期发布从供应商生成的项目的类。(在实践中,您可能会添加方法来独立启动和停止生成,共享Executors在发布者之间,等等,或者使用SubmissionPublisher作为一个组件而不是一个超类。)
class PeriodicPublisher<T> extends SubmissionPublisher<T> {
final ScheduledFuture<?> periodicTask;
final ScheduledExecutorService scheduler;
PeriodicPublisher(Executor executor, int maxBufferCapacity,
Supplier<? extends T> supplier,
long period, TimeUnit unit) {
super(executor, maxBufferCapacity);
scheduler = new ScheduledThreadPoolExecutor(1);
periodicTask = scheduler.scheduleAtFixedRate(
() -> submit(supplier.get()), 0, period, unit);
}
public void close() {
periodicTask.cancel(false);
scheduler.shutdown();
super.close();
}
}
这里是一个Flow.Processor
实现的示例。它使用单步请求向其发布者简化说明。一个更自适应的版本可以使用从submit
返回的滞后估计来监视流,以及其他实用方法。
class TransformProcessor<S,T> extends SubmissionPublisher<T>
implements Flow.Processor<S,T> {
final Function<? super S, ? extends T> function;
Flow.Subscription subscription;
TransformProcessor(Executor executor, int maxBufferCapacity,
Function<? super S, ? extends T> function) {
super(executor, maxBufferCapacity);
this.function = function;
}
public void onSubscribe(Flow.Subscription subscription) {
(this.subscription = subscription).request(1);
}
public void onNext(S item) {
subscription.request(1);
submit(function.apply(item));
}
public void onError(Throwable ex) { closeExceptionally(ex); }
public void onComplete() { close(); }
}
- 自 JDK 9 起:
- 9
-
Constructor Summary
ConstructorDescription使用ForkJoinPool.commonPool()
创建一个新的SubmissionPublisher,用于向订阅者异步传递(除非它不支持至少两个并行级别,此时将为每个任务创建一个新线程),最大缓冲容量为Flow.defaultBufferSize()
,并且没有处理程序用于订阅者在方法onNext
中抛出异常。SubmissionPublisher
(Executor executor, int maxBufferCapacity) 使用给定的Executor创建一个新的SubmissionPublisher,用于向订阅者异步传递,每个订阅者的最大缓冲区大小为给定值,并且没有处理程序用于订阅者在方法onNext
中抛出异常。SubmissionPublisher
(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> handler) 使用给定的Executor创建一个新的SubmissionPublisher,用于向订阅者异步传递,每个订阅者的最大缓冲区大小为给定值,并且如果非空,则在订阅者在方法onNext
中抛出异常时调用给定的处理程序。 -
Method Summary
Modifier and TypeMethodDescriptionvoid
close()
除非已关闭,否则向当前订阅者发出onComplete
信号,并禁止后续尝试发布。void
closeExceptionally
(Throwable error) 除非已关闭,否则向当前订阅者发出带有给定错误的onError
信号,并禁止后续尝试发布。使用给定的Consumer函数处理所有已发布的项目。int
返回当前所有订阅者中已生产但尚未消费的项目的最大数量的估计。long
返回当前所有订阅者中已请求(通过request
)但尚未生产的项目的最小数量的估计。返回与closeExceptionally
关联的异常,如果未关闭或正常关闭则返回null。返回用于异步传递的Executor。int
返回每个订阅者的最大缓冲容量。int
返回当前订阅者的数量。List
<Flow.Subscriber<? super T>> 返回当前订阅者的列表,用于监视和跟踪目的,而不是在订阅者上调用Flow.Subscriber
方法。boolean
如果此发布者有任何订阅者,则返回true。boolean
isClosed()
如果此发布者不接受提交,则返回true。boolean
isSubscribed
(Flow.Subscriber<? super T> subscriber) 如果给定的订阅者当前已订阅,则返回true。int
offer
(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) 将给定项目(如果可能)异步地发布给每个当前订阅者,通过调用其onNext
方法,当任何订阅的资源不可用时阻塞,直到指定的超时或调用线程被中断,此时调用给定处理程序(如果非空),如果处理程序返回true,则重试一次。int
offer
(T item, BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) 将给定项目(如果可能)异步地发布给每个当前订阅者,通过调用其onNext
方法。int
将给定项目异步地发布给每个当前订阅者,通过调用其onNext
方法,在任何订阅者的资源不可用时无法中断地阻塞。void
subscribe
(Flow.Subscriber<? super T> subscriber) 添加给定的订阅者,除非已经订阅。
-
Constructor Details
-
SubmissionPublisher
public SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> handler) 使用给定的Executor创建一个新的SubmissionPublisher,用于向订阅者异步传递,每个订阅者的最大缓冲区大小为给定值,并且如果非空,则在订阅者在方法onNext
中抛出异常时调用给定的处理程序。- 参数:
-
executor
- 用于异步传递的执行程序,支持至少创建一个独立线程 -
maxBufferCapacity
- 每个订阅者缓冲区的最大容量(强制容量可能会四舍五入到最接近的2的幂,并/或受此实现支持的最大值限制;方法getMaxBufferCapacity()
返回实际值) -
handler
- 如果非空,在onNext
方法中抛出异常时要调用的过程 - 抛出:
-
NullPointerException
- 如果执行程序为null -
IllegalArgumentException
- 如果maxBufferCapacity不是正数
-
SubmissionPublisher
使用给定的执行程序创建一个新的SubmissionPublisher,用于向订阅者异步传递,每个订阅者的最大缓冲区大小为给定值,并且在onNext
方法中没有订阅者异常处理程序。- 参数:
-
executor
- 用于异步传递的执行程序,支持至少创建一个独立线程 -
maxBufferCapacity
- 每个订阅者缓冲区的最大容量(强制容量可能会四舍五入到最接近的2的幂,并/或受此实现支持的最大值限制;方法getMaxBufferCapacity()
返回实际值) - 抛出:
-
NullPointerException
- 如果执行程序为null -
IllegalArgumentException
- 如果maxBufferCapacity不是正数
-
SubmissionPublisher
public SubmissionPublisher()使用ForkJoinPool.commonPool()
创建一个新的SubmissionPublisher,用于向订阅者异步传递(除非它不支持至少两个并行级别,在这种情况下,将为每个任务创建一个新线程),每个订阅者的最大缓冲区容量为Flow.defaultBufferSize()
,在onNext
方法中没有订阅者异常处理程序。
-
-
Method Details
-
subscribe
添加给定的订阅者,除非已经订阅。如果已经订阅,则在现有订阅上调用订阅者的onError
方法,并使用IllegalStateException
。否则,成功时,异步调用订阅者的onSubscribe
方法,并使用新的Flow.Subscription
。如果onSubscribe
抛出异常,则取消订阅。否则,如果此SubmissionPublisher异常关闭,则调用订阅者的onError
方法,并使用相应的异常,或者如果没有异常关闭,则调用订阅者的onComplete
方法。订阅者可以通过调用新Subscription的request
方法启用接收项目,并可以通过调用其cancel
方法取消订阅。- 指定者:
-
subscribe
在接口Flow.Publisher<T>
- 参数:
-
subscriber
- 订阅者 - 抛出:
-
NullPointerException
- 如果订阅者为null
-
submit
通过异步调用其onNext
方法向每个当前订阅者发布给定项目,当任何订阅者的资源不可用时会阻塞。此方法返回所有当前订阅者中最大滞后(已提交但尚未消耗的项目数量)的估计值。如果有任何订阅者,则此值至少为一(考虑到此提交的项目),否则为零。如果此发布者的执行程序在尝试异步通知订阅者时抛出RejectedExecutionException(或任何其他RuntimeException或Error),则会重新抛出此异常,此时并非所有订阅者都已收到此项目。
- 参数:
-
item
- 要发布的(非null)项目 - 返回:
- 订阅者中的最大滞后的估计值
- 抛出:
-
IllegalStateException
- 如果已关闭 -
NullPointerException
- 如果项目为null -
RejectedExecutionException
- 如果执行程序抛出
-
offer
如果可能,通过异步调用其onNext
方法向每个当前订阅者发布给定项目。如果超出资源限制,一个或多个订阅者可能会丢弃项目,此时如果给定的处理程序(如果非null)返回true,则会重试一次。在处理程序调用时,其他线程对此类中的其他方法的调用将被阻塞。除非恢复得到保证,通常的选项仅限于记录错误和/或向订阅者发出onError
信号。此方法返回一个状态指示器:如果为负数,则表示丢弃的数量(向订阅者发出项目的尝试失败次数为负数)。否则,它是所有当前订阅者中最大滞后的估计值。如果有任何订阅者,则此值至少为一(考虑到此提交的项目),否则为零。
如果此发布者的执行程序在尝试异步通知订阅者时抛出RejectedExecutionException(或任何其他RuntimeException或Error),或者在处理丢弃项目时丢出丢弃处理程序,则会重新抛出此异常。
- 参数:
-
item
- 要发布的(非null)项目 -
onDrop
- 如果非null,当向订阅者丢弃项目时调用的处理程序,参数为订阅者和项目;如果返回true,则重新尝试一次 - 返回:
- 如果为负数,则为(负数)丢弃的数量;否则为最大滞后的估计值
- 抛出:
-
IllegalStateException
- 如果已关闭 -
NullPointerException
- 如果项目为null -
RejectedExecutionException
- 如果执行程序抛出
-
offer
public int offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) 如果尚未关闭,则向当前订阅者发出onComplete
信号,并禁止后续尝试发布。返回时,此方法不保证所有订阅者已经完成。- Parameters:
-
item
- the (non-null) item to publish -
timeout
- how long to wait for resources for any subscriber before giving up, in units ofunit
-
unit
- aTimeUnit
determining how to interpret thetimeout
parameter -
onDrop
- if non-null, the handler invoked upon a drop to a subscriber, with arguments of the subscriber and item; if it returns true, an offer is re-attempted (once) - Returns:
- if negative, the (negative) number of drops; otherwise an estimate of maximum lag
- Throws:
-
IllegalStateException
- if closed -
NullPointerException
- if item is null -
RejectedExecutionException
- if thrown by Executor
-
close
public void close()Unless already closed, issuesonComplete
signals to current subscribers, and disallows subsequent attempts to publish. Upon return, this method does NOT guarantee that all subscribers have yet completed.- 指定者:
-
close
在接口AutoCloseable
-
closeExceptionally
除非已关闭,否则将错误传递给当前订阅者,并禁止后续尝试发布。未来的订阅者也会收到给定的错误。在返回时,此方法不保证所有订阅者都已完成。- 参数:
-
error
- 发送给订阅者的onError
参数 - 抛出:
-
NullPointerException
- 如果错误为null
-
isClosed
public boolean isClosed()如果此发布者不接受提交,则返回true。- 返回:
- 如果已关闭,则为true
-
getClosedException
返回与closeExceptionally
关联的异常,如果未关闭或正常关闭,则返回null。- 返回:
- 异常,如果没有则返回null
-
hasSubscribers
public boolean hasSubscribers()如果此发布者有任何订阅者,则返回true。- 返回:
- 如果此发布者有任何订阅者,则为true
-
getNumberOfSubscribers
public int getNumberOfSubscribers()返回当前订阅者的数量。- 返回:
- 当前订阅者的数量
-
getExecutor
返回用于异步传递的Executor。- 返回:
- 用于异步传递的Executor
-
getMaxBufferCapacity
public int getMaxBufferCapacity()返回每个订阅者的最大缓冲容量。- 返回:
- 每个订阅者的最大缓冲容量
-
getSubscribers
返回当前订阅者的列表,用于监视和跟踪目的,而不是在订阅者上调用Flow.Subscriber
方法。- 返回:
- 当前订阅者的列表
-
isSubscribed
返回给定订阅者当前是否已订阅。- 参数:
-
subscriber
- 订阅者 - 返回:
- 如果当前已订阅,则为true
- 抛出:
-
NullPointerException
- 如果订阅者为null
-
estimateMinimumDemand
public long estimateMinimumDemand()返回请求的最小项目数量的估计(通过request
),但尚未生成,所有当前订阅者中。- 返回:
- 估计值,如果没有订阅者则为零
-
estimateMaximumLag
public int estimateMaximumLag()返回已生产但尚未被所有当前订阅者消耗的项目的最大数量的估计。- 返回:
- 估计值
-
consume
使用给定的Consumer函数处理所有已发布的项目。返回一个CompletableFuture,当此发布者发出onComplete
信号时,将正常完成,或在任何错误时异常完成,或由Consumer抛出异常,或返回的CompletableFuture被取消时,此时不会处理更多项目。- 参数:
-
consumer
- 应用于每个onNext项目的函数 - 返回:
- 一个CompletableFuture,当发布者发出onComplete信号时,将正常完成,当出现任何错误或取消时将异常完成
- 抛出:
-
NullPointerException
- 如果consumer为null
-