数据缓冲区和编解码器
Java NIO提供了ByteBuffer
,但许多库在其之上构建了自己的字节缓冲区API,特别是对于网络操作,其中重用缓冲区和/或使用直接缓冲区对性能有益。例如,Netty具有ByteBuf
层次结构,Undertow使用XNIO,Jetty使用带有回调释放的池化字节缓冲区,等等。 spring-core
模块提供了一组抽象,以便使用各种字节缓冲区API,如下所示:
-
DataBufferFactory
抽象了数据缓冲区的创建。 -
DataBuffer
表示一个字节缓冲区,可能是池化的。 -
DataBufferUtils
提供了用于数据缓冲区的实用方法。 -
编解码器将数据缓冲区流解码或编码为更高级别对象。
DataBufferFactory
DataBufferFactory
用于以以下两种方式之一创建数据缓冲区:
-
分配一个新的数据缓冲区,可选地提前指定容量,如果已知,这样做更有效,即使
DataBuffer
的实现可以根据需要增长和收缩。 -
包装现有的
byte[]
或java.nio.ByteBuffer
,它使用DataBuffer
实现装饰给定的数据,而不涉及分配。
请注意,WebFlux应用程序不直接创建DataBufferFactory
,而是通过客户端侧的ServerHttpResponse
或ClientHttpRequest
访问它。工厂的类型取决于底层的客户端或服务器,例如,对于Reactor Netty是NettyDataBufferFactory
,对于其他情况是DefaultDataBufferFactory
。
DataBuffer
DataBuffer
接口提供了与java.nio.ByteBuffer
类似的操作,但还带来了一些额外的好处,其中一些受到Netty ByteBuf
的启发。以下是一些好处的部分列表:
-
具有独立位置的读写,即不需要调用
flip()
来在读和写之间切换。 -
根据需要扩展容量,就像
java.lang.StringBuilder
一样。 -
通过
PooledDataBuffer
进行缓冲区池化和引用计数。 -
将缓冲区视为
java.nio.ByteBuffer
、InputStream
或OutputStream
。 -
确定给定字节的索引或最后一个索引。
PooledDataBuffer
如在ByteBuffer的Javadoc中所解释的,字节缓冲区可以是直接的或非直接的。直接缓冲区可能驻留在Java堆之外,这消除了对本机I/O操作的复制需求。这使得直接缓冲区特别适用于通过套接字接收和发送数据,但创建和释放直接缓冲区的成本更高,这导致了缓冲区池的概念。
PooledDataBuffer
是DataBuffer
的扩展,有助于引用计数,这对于字节缓冲区池是至关重要的。它是如何工作的?当分配PooledDataBuffer
时,引用计数为1。调用retain()
会增加计数,而调用release()
会减少计数。只要计数大于0,就保证不会释放缓冲区。当计数减少到0时,池化缓冲区可以被释放,实际上可能意味着为缓冲区保留的内存被返回到内存池。
请注意,大多数情况下,与直接操作PooledDataBuffer
相比,最好使用DataBufferUtils
中的便利方法,这些方法仅在DataBuffer
是PooledDataBuffer
的实例时才应用释放或保留。
DataBufferUtils
DataBufferUtils
提供了许多操作数据缓冲区的实用方法:
-
将数据缓冲区流连接成单个缓冲区,可能通过复合缓冲区实现零拷贝,例如,如果底层字节缓冲区API支持。
-
将
InputStream
或NIOChannel
转换为Flux<DataBuffer>
,反之亦然,将Publisher<DataBuffer>
转换为OutputStream
或NIOChannel
。 -
如果缓冲区是
PooledDataBuffer
的实例,则释放或保留DataBuffer
的方法。 -
跳过或从字节流中获取特定字节计数。
编解码器
org.springframework.core.codec
包提供了以下策略接口:
-
Encoder
将Publisher<T>
编码为数据缓冲区流。 -
Decoder
将Publisher<DataBuffer>
解码为更高级别对象流。
spring-core
模块提供了byte[]
、ByteBuffer
、DataBuffer
、Resource
和String
的编码器和解码器实现。 spring-web
模块添加了Jackson JSON、Jackson Smile、JAXB2、Protocol Buffers和其他编码器和解码器。请参阅WebFlux部分中的编解码器。
使用DataBuffer
在处理数据缓冲区时,必须特别注意确保释放缓冲区,因为它们可能是池化的。我们将使用编解码器来说明其工作原理,但这些概念更普遍适用。让我们看看编解码器在内部管理数据缓冲区时必须做些什么。
Decoder
是最后一个读取输入数据缓冲区的,然后创建更高级别对象,因此它必须释放它们,如下所示:
-
如果
Decoder
简单地读取每个输入缓冲区并准备立即释放它,它可以通过DataBufferUtils.release(dataBuffer)
这样做。 -
如果
Decoder
使用Flux
或Mono
操作符,如flatMap
、reduce
等,这些操作符在内部预取和缓存数据项,或者使用filter
、skip
等操作符,这些操作符会略过数据项,则必须将doOnDiscard(DataBuffer.class, DataBufferUtils::release)
添加到组合链中,以确保这些缓冲区在被丢弃之前被释放,可能还会因为错误或取消信号而被丢弃。 -
如果
Decoder
以任何其他方式保留一个或多个数据缓冲区,它必须确保在完全读取时释放它们,或者在缓存的数据缓冲区被读取和释放之前发生错误或取消信号的情况下释放它们。
请注意,DataBufferUtils#join
提供了一种安全有效的方法将数据缓冲区流聚合为单个数据缓冲区。同样,skipUntilByteCount
和takeUntilByteCount
是解码器使用的额外安全方法。
Encoder
分配数据缓冲区,其他必须读取(并释放)这些数据缓冲区。因此,Encoder
没有太多要做。但是,如果在使用数据填充缓冲区时发生序列化错误,Encoder
必须小心释放数据缓冲区。例如:
-
Java
-
Kotlin
DataBuffer buffer = factory.allocateBuffer();
boolean release = true;
try {
// serialize and populate buffer..
release = false;
}
finally {
if (release) {
DataBufferUtils.release(buffer);
}
}
return buffer;
val buffer = factory.allocateBuffer()
var release = true
try {
// serialize and populate buffer..
release = false
} finally {
if (release) {
DataBufferUtils.release(buffer)
}
}
return buffer
Encoder
的消费者负责释放其接收到的数据缓冲区。在WebFlux应用程序中,Encoder
的输出用于写入HTTP服务器响应,或者用于客户端HTTP请求,此时释放数据缓冲区是写入服务器响应或客户端请求的代码的责任。
请注意,在Netty上运行时,有关调试缓冲区泄漏的选项。