RSocket
本节描述了Spring Framework对RSocket协议的支持。
概述
RSocket是一种应用程序协议,用于在TCP、WebSocket和其他字节流传输上进行多路复用、双向通信,使用以下交互模型之一:
-
请求-响应
— 发送一条消息并接收一条回复。 -
请求-流
— 发送一条消息并接收一系列消息。 -
通道
— 在双向发送消息流。 -
发送并忘记
— 发送单向消息。
一旦建立初始连接,"客户端"与"服务器"的区别消失,因为双方变得对称,每一方都可以发起上述交互之一。这就是为什么在协议中将参与方称为"请求者"和"响应者",而上述交互称为"请求流"或简称"请求"。
RSocket协议的关键特性和优势如下:
-
Reactive Streams跨网络边界的语义 — 对于诸如
请求-流
和通道
之类的流式请求,背压信号在请求者和响应者之间传递,允许请求者在源头减慢响应者的速度,从而减少对网络层拥塞控制的依赖,以及在网络层或任何层级上的缓冲需求。 -
请求节流 — 此功能称为"租约",可以从每一端发送
LEASE
帧以限制对方在给定时间内允许的总请求数。租约会定期更新。 -
会话恢复 — 这是为了解决连接丢失而设计的,需要保持一些状态。状态管理对应用程序是透明的,并且与背压结合使用效果很好,可以在可能的情况下停止生产者并减少所需的状态量。
-
大消息的分片和重组。
-
保持活动(心跳)。
RSocket在多种语言中都有实现。Java库建立在Project Reactor和传输层的Reactor Netty之上。这意味着应用程序中来自Reactive Streams发布者的信号可以透明地通过RSocket传播到网络中。
协议
RSocket的一个优点是它在传输中具有明确定义的行为,以及易于阅读的规范,以及一些协议扩展。因此,独立于语言实现和更高级别框架API,阅读规范是一个好主意。本节提供了一个简洁的概述以建立一些上下文。
连接
最初,客户端通过一些低级别的流传输(如TCP或WebSocket)连接到服务器,并向服务器发送一个SETUP
帧以设置连接的参数。
服务器可能会拒绝SETUP
帧,但通常在发送(对于客户端)和接收(对于服务器)之后,双方可以开始发出请求,除非SETUP
指示使用租约语义来限制请求的数量,在这种情况下,双方都必须等待对方发送LEASE
帧以允许发出请求。
发出请求
建立连接后,双方都可以通过REQUEST_RESPONSE
、REQUEST_STREAM
、REQUEST_CHANNEL
或REQUEST_FNF
帧之一发起请求。这些帧中的每一个都携带一个消息,从请求者到响应者。
然后,响应者可以返回带有响应消息的PAYLOAD
帧,对于REQUEST_CHANNEL
,请求者还可以发送带有更多请求消息的PAYLOAD
帧。
当请求涉及消息流(如请求-流
和通道
)时,响应者必须遵守请求者的需求信号。需求以消息数量表示。初始需求在REQUEST_STREAM
和REQUEST_CHANNEL
帧中指定。随后的需求通过REQUEST_N
帧进行信号传递。
每一方还可以通过METADATA_PUSH
帧发送元数据通知,这些通知不涉及任何单独的请求,而是涉及整个连接。
消息格式
RSocket消息包含数据和元数据。元数据可用于发送路由、安全令牌等。数据和元数据可以以不同的格式进行格式化。每个的MIME类型在SETUP
帧中声明,并适用于给定连接上的所有请求。
虽然所有消息都可以具有元数据,但通常像路由这样的元数据是针对每个请求的,因此仅包含在请求的第一条消息中,即使用REQUEST_RESPONSE
、REQUEST_STREAM
、REQUEST_CHANNEL
或REQUEST_FNF
帧之一。
协议扩展定义了用于应用程序的常见元数据格式:
Java实现
RSocket的Java实现建立在Project Reactor上。TCP和WebSocket的传输建立在Reactor Netty上。作为一个Reactive Streams库,Reactor简化了实现协议的工作。对于应用程序来说,使用具有声明性操作符和透明背压支持的Flux
和Mono
是一个自然的选择。
RSocket Java中的API故意保持最小和基本。它专注于协议特性,并将应用程序编程模型(例如RPC代码生成与其他模型)作为更高级别、独立的关注点。
主要的契约io.rsocket.RSocket使用Mono
表示单个消息的承诺,Flux
表示消息流,io.rsocket.Payload
表示实际消息,具有对数据和元数据的访问权限。RSocket
契约被对称地使用。对于请求,应用程序会获得一个RSocket
来执行请求。对于响应,应用程序实现RSocket
来处理请求。
这并不意味着要进行全面介绍。在大多数情况下,Spring应用程序不必直接使用其API。但是,独立于Spring查看或实验RSocket可能很重要。RSocket Java存储库包含许多示例应用程序,演示了其API和协议特性。
Spring支持
spring-messaging
模块包含以下内容:
-
RSocketRequester — 通过
io.rsocket.RSocket
进行请求的流畅API,包括数据和元数据的编码/解码。 -
注解响应者 — 用于响应的
@MessageMapping
和@RSocketExchange
注解处理程序方法。 -
RSocket接口 — RSocket服务声明为Java接口,具有
@RSocketExchange
方法,可用作请求者或响应者。
spring-web
模块包含RSocket应用程序可能需要的Encoder
和Decoder
实现,如Jackson CBOR/JSON和Protobuf。它还包含PathPatternParser
,可用于有效的路由匹配。
Spring Boot 2.2支持在TCP或WebSocket上启动RSocket服务器,包括在WebFlux服务器中公开RSocket over WebSocket的选项。还提供了RSocketRequester.Builder
和RSocketStrategies
的客户端支持和自动配置。有关更多详细信息,请参阅Spring Boot参考中的RSocket部分。
Spring Security 5.2提供了RSocket支持。
Spring Integration 5.2提供了与RSocket客户端和服务器交互的入站和出站网关。有关更多详细信息,请参阅Spring Integration参考手册。
Spring Cloud Gateway支持RSocket连接。
RSocket请求者
RSocketRequester
提供了一个流畅的API来执行RSocket请求,接受和返回对象作为数据和元数据,而不是低级数据缓冲区。它可以对称地使用,从客户端发出请求,也可以从服务器发出请求。
客户端请求者
在客户端端获取一个RSocketRequester
是连接到服务器的过程,涉及发送一个带有连接设置的RSocket SETUP
帧。 RSocketRequester
提供了一个构建器,帮助准备一个io.rsocket.core.RSocketConnector
,包括SETUP
帧的连接设置。
这是使用默认设置连接的最基本方式:
-
Java
-
Kotlin
RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)
URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)
上述代码不会立即连接。当发出请求时,会自动建立一个共享连接并使用。
连接设置
RSocketRequester.Builder
提供以下内容来自定义初始的SETUP
帧:
-
dataMimeType(MimeType)
— 设置连接上数据的媒体类型。 -
metadataMimeType(MimeType)
— 设置连接上元数据的媒体类型。 -
setupData(Object)
— 包含在SETUP
中的数据。 -
setupRoute(String, Object…)
— 包含在SETUP
中的元数据中的路由。 -
setupMetadata(Object, MimeType)
— 包含在SETUP
中的其他元数据。
对于数据,默认的媒体类型是从第一个配置的Decoder
派生的。对于元数据,默认的媒体类型是允许每个请求多个元数据值和媒体类型对的复合元数据。通常情况下,两者都不需要更改。
SETUP
帧中的数据和元数据是可选的。在服务器端,可以使用@ConnectMapping方法来处理连接的开始和SETUP
帧的内容。元数据可用于连接级别的安全性。
策略
RSocketRequester.Builder
接受RSocketStrategies
来配置请求者。您需要使用这个来提供编码器和解码器,用于数据和元数据值的序列化和反序列化。默认情况下,仅注册了来自spring-core
的基本编解码器,用于String
、byte[]
和ByteBuffer
。添加spring-web
提供了更多可注册的内容,如下所示:
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000)
RSocketStrategies
设计用于重复使用。在某些场景下,例如客户端和服务器在同一个应用程序中,最好在Spring配置中声明它。
客户端响应器
RSocketRequester.Builder
可用于配置响应来自服务器的请求。
您可以使用带注释的处理程序来基于与服务器相同的基础架构进行客户端响应,但以编程方式注册如下:
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher()) (1)
.build();
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder)) (3)
.tcp("localhost", 7000);
1 | 如果存在 spring-web ,请使用 PathPatternRouteMatcher 进行有效路由匹配。 |
2 | 从带有 @MessageMapping 和/或 @ConnectMapping 方法的类创建响应器。 |
3 | 注册响应器。 |
val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher()) (1)
.build()
val responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(responder) } (3)
.tcp("localhost", 7000)
1 | 如果存在 spring-web ,请使用 PathPatternRouteMatcher 进行有效路由匹配。 |
2 | 从带有 @MessageMapping 和/或 @ConnectMapping 方法的类创建响应器。 |
3 | 注册响应器。 |
请注意,上述仅是用于以编程方式注册客户端响应器的快捷方式。对于客户端响应器在Spring配置中的替代方案,您仍然可以将 RSocketMessageHandler
声明为Spring bean,然后应用如下:
-
Java
-
Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.tcp("localhost", 7000)
对于上述情况,您可能还需要在 RSocketMessageHandler
中使用 setHandlerPredicate
来切换到不同的策略以检测客户端响应器,例如基于自定义注解如 @RSocketClientResponder
而不是默认的 @Controller
。在具有客户端和服务器或同一应用程序中的多个客户端的情况下,这是必要的。
另请参阅 带注释的响应器,了解更多关于编程模型的信息。
高级
RSocketRequesterBuilder
提供了一个回调来公开底层的 io.rsocket.core.RSocketConnector
,以便进一步配置选项,如保持活动间隔、会话恢复、拦截器等。您可以在该级别配置选项如下:
-
Java
-
Kotlin
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
.rsocketConnector {
//...
}
.tcp("localhost", 7000)
服务器请求者
从服务器向连接的客户端发出请求,关键在于从服务器获取连接客户端的请求者。
在注解响应者中,@ConnectMapping
和@MessageMapping
方法支持一个RSocketRequester
参数。使用它来访问连接的客户端的请求者。请记住,@ConnectMapping
方法本质上是SETUP
帧的处理程序,必须在请求开始之前处理。因此,在最开始的请求必须与处理分离。例如:
-
Java
-
Kotlin
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> { (1)
// ...
});
return ... (2)
}
1 | 异步启动请求,与处理分离。 |
2 | 执行处理并返回完成的Mono<Void> 。 |
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
GlobalScope.launch {
requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
// ...
}
}
/// ... (2)
}
1 | 异步启动请求,与处理分离。 |
2 | 在挂起函数中执行处理。 |
请求
-
Java
-
Kotlin
ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
.data(viewBox) (2)
.retrieveFlux(AirportLocation.class); (3)
1 | 指定要包含在请求消息元数据中的路由。 |
2 | 为请求消息提供数据。 |
3 | 声明预期的响应。 |
val viewBox: ViewBox = ...
val locations = requester.route("locate.radars.within") (1)
.data(viewBox) (2)
.retrieveFlow<AirportLocation>() (3)
1 | 指定要包含在请求消息元数据中的路由。 |
2 | 为请求消息提供数据。 |
3 | 声明预期的响应。 |
请求-流
,因为发送一个值并接收一系列值。在大多数情况下,只要输入和输出的选择与RSocket交互类型匹配,并且与响应者期望的输入和输出类型匹配,您就不需要考虑这一点。唯一的无效组合示例是多对一。
data(Object)
方法还接受任何Reactive StreamsPublisher
,包括Flux
和Mono
,以及任何其他值生产者,该值生产者在ReactiveAdapterRegistry
中注册。对于像Flux
这样产生相同类型值的多值Publisher
,考虑使用重载的data
方法之一,以避免在每个元素上进行类型检查和Encoder
查找:
data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
data(Object)
步骤是可选的。对于不发送数据的请求,请跳过它:
-
Java
-
Kotlin
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()
如果使用复合元数据
(默认)并且这些值受到注册的Encoder
支持,可以添加额外的元数据值。例如:
-
Java
-
Kotlin
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()
对于Fire-and-Forget
,使用返回Mono<Void>
的send()
方法。请注意,Mono
仅表示消息已成功发送,而不表示已处理。
对于Metadata-Push
,请使用带有Mono<Void>
返回值的sendMetadata()
方法。
注解响应者
RSocket响应者可以作为@MessageMapping
和@ConnectMapping
方法实现。 @MessageMapping
方法处理单个请求,而@ConnectMapping
方法处理连接级事件(设置和元数据推送)。注解响应者支持对称地,用于从服务器端响应和从客户端端响应。
服务器响应者
要在服务器端使用注解响应者,请将RSocketMessageHandler
添加到Spring配置中,以便检测带有@MessageMapping
和@ConnectMapping
方法的@Controller
bean:
-
Java
-
Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.routeMatcher(new PathPatternRouteMatcher());
return handler;
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
routeMatcher = PathPatternRouteMatcher()
}
}
然后通过Java RSocket API启动一个RSocket服务器,并将RSocketMessageHandler
插入响应者如下:
-
Java
-
Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
CloseableChannel server =
RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.block();
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val server = RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.awaitSingle()
RSocketMessageHandler
默认支持复合和路由元数据。如果需要切换到不同的MIME类型或注册其他元数据MIME类型,可以设置其MetadataExtractor。
您需要设置编码器和解码器实例以支持元数据和数据格式。您可能需要使用spring-web
模块来获取编解码器实现。
默认情况下,使用AntPathMatcher
通过SimpleRouteMatcher
匹配路由。我们建议使用spring-web
中的PathPatternRouteMatcher
进行有效的路由匹配。RSocket路由可以是分层的,但不是URL路径。两个路由匹配器默认配置为使用“.”作为分隔符,并且与HTTP URL不同,没有URL解码。
RSocketMessageHandler
可以通过RSocketStrategies
进行配置,如果需要在同一进程中的客户端和服务器之间共享配置,则可能很有用:
-
Java
-
Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(rsocketStrategies());
return handler;
}
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.routeMatcher(new PathPatternRouteMatcher())
.build();
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
rSocketStrategies = rsocketStrategies()
}
@Bean
fun rsocketStrategies() = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.routeMatcher(PathPatternRouteMatcher())
.build()
}
客户端响应者
客户端端的注解响应者需要在RSocketRequester.Builder
中进行配置。详情请参见客户端响应者。
@MessageMapping
-
Java
-
Kotlin
@Controller
public class RadarsController {
@MessageMapping("locate.radars.within")
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
@Controller
class RadarsController {
@MessageMapping("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
上述@MessageMapping
方法响应具有路由“locate.radars.within”的请求流交互。它支持具有以下方法参数的灵活方法签名:
方法参数 | 描述 |
---|---|
|
请求的有效负载。这可以是 注意:注解的使用是可选的。不是简单类型且不是其他支持的参数之一的方法参数,被假定为预期的有效负载。 |
|
用于向远程端发出请求的请求器。 |
|
从映射模式中基于变量提取的值,例如 |
|
作为描述在MetadataExtractor中注册的元数据值。 |
|
作为描述在MetadataExtractor中注册的所有元数据值。 |
期望的返回值应为一个或多个要序列化为响应有效负载的对象。这可以是Mono
或Flux
等异步类型,具体值,或者是void
或Mono<Void>
等无值异步类型。
@MessageMapping
方法支持的RSocket交互类型是根据输入(即@Payload
参数)和输出的基数确定的,其中基数表示以下内容:
基数 | 描述 |
---|---|
1 |
明确值,或单值异步类型,如 |
多个 |
多值异步类型,如 |
0 |
对于输入,这意味着方法没有 对于输出,这是 |
下表显示了所有输入和输出基数组合及相应的交互类型:
输入基数 | 输出基数 | 交互类型 |
---|---|---|
0, 1 |
0 |
Fire-and-Forget, Request-Response |
0, 1 |
1 |
Request-Response |
0, 1 |
Many |
Request-Stream |
Many |
0, 1, Many |
Request-Channel |
@RSocketExchange
作为@MessageMapping
的替代方案,还可以使用@RSocketExchange
方法处理请求。这种方法在RSocket接口上声明,并可以通过RSocketServiceProxyFactory
用作请求器,或由响应器实现。
例如,作为响应器处理请求:
-
Java
-
Kotlin
public interface RadarsService {
@RSocketExchange("locate.radars.within")
Flux<AirportLocation> radars(MapRequest request);
}
@Controller
public class RadarsController implements RadarsService {
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
interface RadarsService {
@RSocketExchange("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation>
}
@Controller
class RadarsController : RadarsService {
override fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
@RSocketExchange
与@MessageMapping
之间存在一些差异,因为前者需要保持适用于请求器和响应器的用途。例如,虽然@MessageMapping
可以声明处理任意数量的路由,且每个路由可以是模式,但@RSocketExchange
必须使用单个、具体的路由进行声明。在与元数据相关的支持方法参数方面也存在一些小差异,请参阅@MessageMapping和RSocket接口以获取支持的参数列表。
@RSocketExchange
可以在类型级别使用,为给定的RSocket服务接口的所有路由指定一个公共前缀。
@ConnectMapping
@ConnectMapping
处理RSocket连接开始时的SETUP
帧,以及通过METADATA_PUSH
帧进行的任何后续元数据推送通知,即io.rsocket.RSocket
中的metadataPush(Payload)
。
@ConnectMapping
方法支持与@MessageMapping相同的参数,但基于SETUP
和METADATA_PUSH
帧中的元数据和数据。 @ConnectMapping
可以具有模式,以将处理范围缩小到具有元数据中路由的特定连接,或者如果未声明模式,则所有连接都匹配。
@ConnectMapping
方法不能返回数据,必须声明返回值为void
或Mono<Void>
。如果处理对新连接返回错误,则连接将被拒绝。处理不能被阻塞以向连接的RSocketRequester
发出请求。有关详细信息,请参见服务器请求者。
元数据提取器
响应者必须解释元数据。复合元数据允许独立格式化的元数据值(例如用于路由、安全、跟踪),每个值都有自己的 MIME 类型。应用程序需要一种配置支持的元数据 MIME 类型的方式,以及一种访问提取值的方式。
MetadataExtractor
是一个合同,用于接收序列化的元数据并返回解码后的名称-值对,然后可以通过名称访问,例如通过注解处理程序方法中的 @Header
。
DefaultMetadataExtractor
可以提供 Decoder
实例来解码元数据。它内置支持 "message/x.rsocket.routing.v0",它将其解码为 String
并保存在 "route" 键下。对于任何其他 MIME 类型,您需要提供一个 Decoder
并按以下方式注册 MIME 类型:
-
Java
-
Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")
复合元数据很好地结合了独立的元数据值。但请求者可能不支持复合元数据,或者可能选择不使用它。对于这种情况,DefaultMetadataExtractor
可能需要自定义逻辑来将解码后的值映射到输出映射中。以下是一个使用 JSON 作为元数据的示例:
-
Java
-
Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
MimeType.valueOf("application/vnd.myapp.metadata+json"),
new ParameterizedTypeReference<Map<String,String>>() {},
(jsonMap, outputMap) -> {
outputMap.putAll(jsonMap);
});
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
outputMap.putAll(jsonMap)
}
通过 RSocketStrategies
配置 MetadataExtractor
时,您可以让 RSocketStrategies.Builder
使用配置的解码器创建提取器,并简单地使用回调来自定义注册,如下所示:
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.metadataExtractorRegistry(registry -> {
registry.metadataToExtract(fooMimeType, Foo.class, "foo");
// ...
})
.build();
import org.springframework.messaging.rsocket.metadataToExtract
val strategies = RSocketStrategies.builder()
.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
registry.metadataToExtract<Foo>(fooMimeType, "foo")
// ...
}
.build()
RSocket 接口
Spring Framework 允许您将 RSocket 服务定义为具有 @RSocketExchange
方法的 Java 接口。您可以将这样的接口传递给 RSocketServiceProxyFactory
,以创建通过 RSocketRequester 执行请求的代理。您还可以将接口实现为处理请求的响应者。
首先创建具有 @RSocketExchange
方法的接口:
interface RadarService {
@RSocketExchange("radars")
Flux<AirportLocation> getRadars(@Payload MapRequest request);
// 更多 RSocket 交换方法...
}
现在,您可以创建一个在调用方法时执行请求的代理:
RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();
RadarService service = factory.createClient(RadarService.class);
您还可以实现接口作为响应者处理请求。请参阅 带注解的响应者。
方法参数
带注解的 RSocket 交换方法支持具有以下方法参数的灵活方法签名:
方法参数 | 描述 |
---|---|
|
添加路由变量以传递给 |
|
设置请求的输入负载。这可以是具体值,也可以是任何可以适应响应式流 |
|
输入负载中元数据条目的值。只要下一个参数是元数据条目 |
|
元数据条目的 |