RSocket

本节描述了Spring Framework对RSocket协议的支持。

概述

RSocket是一种应用程序协议,用于在TCP、WebSocket和其他字节流传输上进行多路复用、双向通信,使用以下交互模型之一:

  • 请求-响应 — 发送一条消息并接收一条回复。

  • 请求-流 — 发送一条消息并接收一系列消息。

  • 通道 — 在双向发送消息流。

  • 发送并忘记 — 发送单向消息。

一旦建立初始连接,"客户端"与"服务器"的区别消失,因为双方变得对称,每一方都可以发起上述交互之一。这就是为什么在协议中将参与方称为"请求者"和"响应者",而上述交互称为"请求流"或简称"请求"。

RSocket协议的关键特性和优势如下:

  • Reactive Streams跨网络边界的语义 — 对于诸如请求-流通道之类的流式请求,背压信号在请求者和响应者之间传递,允许请求者在源头减慢响应者的速度,从而减少对网络层拥塞控制的依赖,以及在网络层或任何层级上的缓冲需求。

  • 请求节流 — 此功能称为"租约",可以从每一端发送LEASE帧以限制对方在给定时间内允许的总请求数。租约会定期更新。

  • 会话恢复 — 这是为了解决连接丢失而设计的,需要保持一些状态。状态管理对应用程序是透明的,并且与背压结合使用效果很好,可以在可能的情况下停止生产者并减少所需的状态量。

  • 大消息的分片和重组。

  • 保持活动(心跳)。

RSocket在多种语言中都有实现。Java库建立在Project Reactor和传输层的Reactor Netty之上。这意味着应用程序中来自Reactive Streams发布者的信号可以透明地通过RSocket传播到网络中。

协议

RSocket的一个优点是它在传输中具有明确定义的行为,以及易于阅读的规范,以及一些协议扩展。因此,独立于语言实现和更高级别框架API,阅读规范是一个好主意。本节提供了一个简洁的概述以建立一些上下文。

连接

最初,客户端通过一些低级别的流传输(如TCP或WebSocket)连接到服务器,并向服务器发送一个SETUP帧以设置连接的参数。

服务器可能会拒绝SETUP帧,但通常在发送(对于客户端)和接收(对于服务器)之后,双方可以开始发出请求,除非SETUP指示使用租约语义来限制请求的数量,在这种情况下,双方都必须等待对方发送LEASE帧以允许发出请求。

发出请求

建立连接后,双方都可以通过REQUEST_RESPONSEREQUEST_STREAMREQUEST_CHANNELREQUEST_FNF帧之一发起请求。这些帧中的每一个都携带一个消息,从请求者到响应者。

然后,响应者可以返回带有响应消息的PAYLOAD帧,对于REQUEST_CHANNEL,请求者还可以发送带有更多请求消息的PAYLOAD帧。

当请求涉及消息流(如请求-流通道)时,响应者必须遵守请求者的需求信号。需求以消息数量表示。初始需求在REQUEST_STREAMREQUEST_CHANNEL帧中指定。随后的需求通过REQUEST_N帧进行信号传递。

每一方还可以通过METADATA_PUSH帧发送元数据通知,这些通知不涉及任何单独的请求,而是涉及整个连接。

消息格式

RSocket消息包含数据和元数据。元数据可用于发送路由、安全令牌等。数据和元数据可以以不同的格式进行格式化。每个的MIME类型在SETUP帧中声明,并适用于给定连接上的所有请求。

虽然所有消息都可以具有元数据,但通常像路由这样的元数据是针对每个请求的,因此仅包含在请求的第一条消息中,即使用REQUEST_RESPONSEREQUEST_STREAMREQUEST_CHANNELREQUEST_FNF帧之一。

协议扩展定义了用于应用程序的常见元数据格式:

  • 复合元数据 — 多个、独立格式化的元数据条目。

  • 路由 — 请求的路由。

Java实现

RSocket的Java实现建立在Project Reactor上。TCP和WebSocket的传输建立在Reactor Netty上。作为一个Reactive Streams库,Reactor简化了实现协议的工作。对于应用程序来说,使用具有声明性操作符和透明背压支持的FluxMono是一个自然的选择。

RSocket Java中的API故意保持最小和基本。它专注于协议特性,并将应用程序编程模型(例如RPC代码生成与其他模型)作为更高级别、独立的关注点。

主要的契约io.rsocket.RSocket使用Mono表示单个消息的承诺,Flux表示消息流,io.rsocket.Payload表示实际消息,具有对数据和元数据的访问权限。RSocket契约被对称地使用。对于请求,应用程序会获得一个RSocket来执行请求。对于响应,应用程序实现RSocket来处理请求。

这并不意味着要进行全面介绍。在大多数情况下,Spring应用程序不必直接使用其API。但是,独立于Spring查看或实验RSocket可能很重要。RSocket Java存储库包含许多示例应用程序,演示了其API和协议特性。

Spring支持

spring-messaging模块包含以下内容:

  • RSocketRequester — 通过io.rsocket.RSocket进行请求的流畅API,包括数据和元数据的编码/解码。

  • 注解响应者 — 用于响应的@MessageMapping@RSocketExchange注解处理程序方法。

  • RSocket接口 — RSocket服务声明为Java接口,具有@RSocketExchange方法,可用作请求者或响应者。

spring-web模块包含RSocket应用程序可能需要的EncoderDecoder实现,如Jackson CBOR/JSON和Protobuf。它还包含PathPatternParser,可用于有效的路由匹配。

Spring Boot 2.2支持在TCP或WebSocket上启动RSocket服务器,包括在WebFlux服务器中公开RSocket over WebSocket的选项。还提供了RSocketRequester.BuilderRSocketStrategies的客户端支持和自动配置。有关更多详细信息,请参阅Spring Boot参考中的RSocket部分

Spring Security 5.2提供了RSocket支持。

Spring Integration 5.2提供了与RSocket客户端和服务器交互的入站和出站网关。有关更多详细信息,请参阅Spring Integration参考手册。

Spring Cloud Gateway支持RSocket连接。

RSocket请求者

RSocketRequester提供了一个流畅的API来执行RSocket请求,接受和返回对象作为数据和元数据,而不是低级数据缓冲区。它可以对称地使用,从客户端发出请求,也可以从服务器发出请求。

客户端请求者

在客户端端获取一个RSocketRequester是连接到服务器的过程,涉及发送一个带有连接设置的RSocket SETUP帧。 RSocketRequester提供了一个构建器,帮助准备一个io.rsocket.core.RSocketConnector,包括SETUP帧的连接设置。

这是使用默认设置连接的最基本方式:

  • Java

  • Kotlin

RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);

URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)

URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)

上述代码不会立即连接。当发出请求时,会自动建立一个共享连接并使用。

连接设置

RSocketRequester.Builder提供以下内容来自定义初始的SETUP帧:

  • dataMimeType(MimeType) — 设置连接上数据的媒体类型。

  • metadataMimeType(MimeType) — 设置连接上元数据的媒体类型。

  • setupData(Object) — 包含在SETUP中的数据。

  • setupRoute(String, Object…​) — 包含在SETUP中的元数据中的路由。

  • setupMetadata(Object, MimeType) — 包含在SETUP中的其他元数据。

对于数据,默认的媒体类型是从第一个配置的Decoder派生的。对于元数据,默认的媒体类型是允许每个请求多个元数据值和媒体类型对的复合元数据。通常情况下,两者都不需要更改。

SETUP帧中的数据和元数据是可选的。在服务器端,可以使用@ConnectMapping方法来处理连接的开始和SETUP帧的内容。元数据可用于连接级别的安全性。

策略

RSocketRequester.Builder接受RSocketStrategies来配置请求者。您需要使用这个来提供编码器和解码器,用于数据和元数据值的序列化和反序列化。默认情况下,仅注册了来自spring-core的基本编解码器,用于Stringbyte[]ByteBuffer。添加spring-web提供了更多可注册的内容,如下所示:

  • Java

  • Kotlin

RSocketStrategies strategies = RSocketStrategies.builder()
	.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
	.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
	.build();

RSocketRequester requester = RSocketRequester.builder()
	.rsocketStrategies(strategies)
	.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
		.encoders { it.add(Jackson2CborEncoder()) }
		.decoders { it.add(Jackson2CborDecoder()) }
		.build()

val requester = RSocketRequester.builder()
		.rsocketStrategies(strategies)
		.tcp("localhost", 7000)

RSocketStrategies设计用于重复使用。在某些场景下,例如客户端和服务器在同一个应用程序中,最好在Spring配置中声明它。

客户端响应器

RSocketRequester.Builder 可用于配置响应来自服务器的请求。

您可以使用带注释的处理程序来基于与服务器相同的基础架构进行客户端响应,但以编程方式注册如下:

  • Java

  • Kotlin

RSocketStrategies strategies = RSocketStrategies.builder()
	.routeMatcher(new PathPatternRouteMatcher())  (1)
	.build();

SocketAcceptor responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(responder)) (3)
	.tcp("localhost", 7000);
1 如果存在 spring-web,请使用 PathPatternRouteMatcher 进行有效路由匹配。
2 从带有 @MessageMapping 和/或 @ConnectMapping 方法的类创建响应器。
3 注册响应器。
val strategies = RSocketStrategies.builder()
		.routeMatcher(PathPatternRouteMatcher())  (1)
		.build()

val responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(responder) } (3)
		.tcp("localhost", 7000)
1 如果存在 spring-web,请使用 PathPatternRouteMatcher 进行有效路由匹配。
2 从带有 @MessageMapping 和/或 @ConnectMapping 方法的类创建响应器。
3 注册响应器。

请注意,上述仅是用于以编程方式注册客户端响应器的快捷方式。对于客户端响应器在Spring配置中的替代方案,您仍然可以将 RSocketMessageHandler 声明为Spring bean,然后应用如下:

  • Java

  • Kotlin

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(handler.responder()))
	.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(handler.responder()) }
		.tcp("localhost", 7000)

对于上述情况,您可能还需要在 RSocketMessageHandler 中使用 setHandlerPredicate 来切换到不同的策略以检测客户端响应器,例如基于自定义注解如 @RSocketClientResponder 而不是默认的 @Controller。在具有客户端和服务器或同一应用程序中的多个客户端的情况下,这是必要的。

另请参阅 带注释的响应器,了解更多关于编程模型的信息。

高级

RSocketRequesterBuilder 提供了一个回调来公开底层的 io.rsocket.core.RSocketConnector,以便进一步配置选项,如保持活动间隔、会话恢复、拦截器等。您可以在该级别配置选项如下:

  • Java

  • Kotlin

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> {
		// ...
	})
	.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
		.rsocketConnector {
			//...
		}
		.tcp("localhost", 7000)

服务器请求者

从服务器向连接的客户端发出请求,关键在于从服务器获取连接客户端的请求者。

注解响应者中,@ConnectMapping@MessageMapping方法支持一个RSocketRequester参数。使用它来访问连接的客户端的请求者。请记住,@ConnectMapping方法本质上是SETUP帧的处理程序,必须在请求开始之前处理。因此,在最开始的请求必须与处理分离。例如:

  • Java

  • Kotlin

@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
	requester.route("status").data("5")
		.retrieveFlux(StatusReport.class)
		.subscribe(bar -> { (1)
			// ...
		});
	return ... (2)
}
1 异步启动请求,与处理分离。
2 执行处理并返回完成的Mono<Void>
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
	GlobalScope.launch {
		requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
			// ...
		}
	}
	/// ... (2)
}
1 异步启动请求,与处理分离。
2 在挂起函数中执行处理。

请求

一旦您拥有客户端服务器请求者,您可以按照以下方式发出请求:

  • Java

  • Kotlin

ViewBox viewBox = ... ;

Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlux(AirportLocation.class); (3)
1 指定要包含在请求消息元数据中的路由。
2 为请求消息提供数据。
3 声明预期的响应。
val viewBox: ViewBox = ...

val locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlow<AirportLocation>() (3)
1 指定要包含在请求消息元数据中的路由。
2 为请求消息提供数据。
3 声明预期的响应。
请求-流,因为发送一个值并接收一系列值。在大多数情况下,只要输入和输出的选择与RSocket交互类型匹配,并且与响应者期望的输入和输出类型匹配,您就不需要考虑这一点。唯一的无效组合示例是多对一。

data(Object)方法还接受任何Reactive StreamsPublisher,包括FluxMono,以及任何其他值生产者,该值生产者在ReactiveAdapterRegistry中注册。对于像Flux这样产生相同类型值的多值Publisher,考虑使用重载的data方法之一,以避免在每个元素上进行类型检查和Encoder查找:

data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

data(Object)步骤是可选的。对于不发送数据的请求,请跳过它:

  • Java

  • Kotlin

Mono<AirportLocation> location = requester.route("find.radar.EWR"))
	.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait

val location = requester.route("find.radar.EWR")
	.retrieveAndAwait<AirportLocation>()

如果使用复合元数据(默认)并且这些值受到注册的Encoder支持,可以添加额外的元数据值。例如:

  • Java

  • Kotlin

String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");

Flux<AirportLocation> locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow

val requester: RSocketRequester = ...

val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")

val locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlow<AirportLocation>()

对于Fire-and-Forget,使用返回Mono<Void>send()方法。请注意,Mono仅表示消息已成功发送,而不表示已处理。

对于Metadata-Push,请使用带有Mono<Void>返回值的sendMetadata()方法。

注解响应者

RSocket响应者可以作为@MessageMapping@ConnectMapping方法实现。 @MessageMapping方法处理单个请求,而@ConnectMapping方法处理连接级事件(设置和元数据推送)。注解响应者支持对称地,用于从服务器端响应和从客户端端响应。

服务器响应者

要在服务器端使用注解响应者,请将RSocketMessageHandler添加到Spring配置中,以便检测带有@MessageMapping@ConnectMapping方法的@Controller bean:

  • Java

  • Kotlin

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.routeMatcher(new PathPatternRouteMatcher());
		return handler;
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		routeMatcher = PathPatternRouteMatcher()
	}
}

然后通过Java RSocket API启动一个RSocket服务器,并将RSocketMessageHandler插入响应者如下:

  • Java

  • Kotlin

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

CloseableChannel server =
	RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.block();
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val server = RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.awaitSingle()

RSocketMessageHandler默认支持复合路由元数据。如果需要切换到不同的MIME类型或注册其他元数据MIME类型,可以设置其MetadataExtractor

您需要设置编码器和解码器实例以支持元数据和数据格式。您可能需要使用spring-web模块来获取编解码器实现。

默认情况下,使用AntPathMatcher通过SimpleRouteMatcher匹配路由。我们建议使用spring-web中的PathPatternRouteMatcher进行有效的路由匹配。RSocket路由可以是分层的,但不是URL路径。两个路由匹配器默认配置为使用“.”作为分隔符,并且与HTTP URL不同,没有URL解码。

RSocketMessageHandler可以通过RSocketStrategies进行配置,如果需要在同一进程中的客户端和服务器之间共享配置,则可能很有用:

  • Java

  • Kotlin

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.setRSocketStrategies(rsocketStrategies());
		return handler;
	}

	@Bean
	public RSocketStrategies rsocketStrategies() {
		return RSocketStrategies.builder()
			.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
			.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
			.routeMatcher(new PathPatternRouteMatcher())
			.build();
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		rSocketStrategies = rsocketStrategies()
	}

	@Bean
	fun rsocketStrategies() = RSocketStrategies.builder()
			.encoders { it.add(Jackson2CborEncoder()) }
			.decoders { it.add(Jackson2CborDecoder()) }
			.routeMatcher(PathPatternRouteMatcher())
			.build()
}

客户端响应者

客户端端的注解响应者需要在RSocketRequester.Builder中进行配置。详情请参见客户端响应者

@MessageMapping

一旦配置了服务器客户端响应器配置,可以如下使用@MessageMapping方法:

  • Java

  • Kotlin

@Controller
public class RadarsController {

	@MessageMapping("locate.radars.within")
	public Flux<AirportLocation> radars(MapRequest request) {
		// ...
	}
}
@Controller
class RadarsController {

	@MessageMapping("locate.radars.within")
	fun radars(request: MapRequest): Flow<AirportLocation> {
		// ...
	}
}

上述@MessageMapping方法响应具有路由“locate.radars.within”的请求流交互。它支持具有以下方法参数的灵活方法签名:

方法参数 描述

@Payload

请求的有效负载。这可以是MonoFlux等异步类型的具体值。

注意:注解的使用是可选的。不是简单类型且不是其他支持的参数之一的方法参数,被假定为预期的有效负载。

RSocketRequester

用于向远程端发出请求的请求器。

@DestinationVariable

从映射模式中基于变量提取的值,例如@MessageMapping("find.radar.{id}")

@Header

作为描述在MetadataExtractor中注册的元数据值。

@Headers Map<String, Object>

作为描述在MetadataExtractor中注册的所有元数据值。

期望的返回值应为一个或多个要序列化为响应有效负载的对象。这可以是MonoFlux等异步类型,具体值,或者是voidMono<Void>等无值异步类型。

@MessageMapping方法支持的RSocket交互类型是根据输入(即@Payload参数)和输出的基数确定的,其中基数表示以下内容:

基数 描述

1

明确值,或单值异步类型,如Mono<T>

多个

多值异步类型,如Flux<T>

0

对于输入,这意味着方法没有@Payload参数。

对于输出,这是void或无值异步类型,如Mono<Void>

下表显示了所有输入和输出基数组合及相应的交互类型:

输入基数 输出基数 交互类型

0, 1

0

Fire-and-Forget, Request-Response

0, 1

1

Request-Response

0, 1

Many

Request-Stream

Many

0, 1, Many

Request-Channel

@RSocketExchange

作为@MessageMapping的替代方案,还可以使用@RSocketExchange方法处理请求。这种方法在RSocket接口上声明,并可以通过RSocketServiceProxyFactory用作请求器,或由响应器实现。

例如,作为响应器处理请求:

  • Java

  • Kotlin

public interface RadarsService {

	@RSocketExchange("locate.radars.within")
	Flux<AirportLocation> radars(MapRequest request);
}

@Controller
public class RadarsController implements RadarsService {

	public Flux<AirportLocation> radars(MapRequest request) {
		// ...
	}
}
interface RadarsService {

	@RSocketExchange("locate.radars.within")
	fun radars(request: MapRequest): Flow<AirportLocation>
}

@Controller
class RadarsController : RadarsService {

	override fun radars(request: MapRequest): Flow<AirportLocation> {
		// ...
	}
}

@RSocketExchange@MessageMapping之间存在一些差异,因为前者需要保持适用于请求器和响应器的用途。例如,虽然@MessageMapping可以声明处理任意数量的路由,且每个路由可以是模式,但@RSocketExchange必须使用单个、具体的路由进行声明。在与元数据相关的支持方法参数方面也存在一些小差异,请参阅@MessageMappingRSocket接口以获取支持的参数列表。

@RSocketExchange可以在类型级别使用,为给定的RSocket服务接口的所有路由指定一个公共前缀。

@ConnectMapping

@ConnectMapping处理RSocket连接开始时的SETUP帧,以及通过METADATA_PUSH帧进行的任何后续元数据推送通知,即io.rsocket.RSocket中的metadataPush(Payload)

@ConnectMapping方法支持与@MessageMapping相同的参数,但基于SETUPMETADATA_PUSH帧中的元数据和数据。 @ConnectMapping可以具有模式,以将处理范围缩小到具有元数据中路由的特定连接,或者如果未声明模式,则所有连接都匹配。

@ConnectMapping方法不能返回数据,必须声明返回值为voidMono<Void>。如果处理对新连接返回错误,则连接将被拒绝。处理不能被阻塞以向连接的RSocketRequester发出请求。有关详细信息,请参见服务器请求者

元数据提取器

响应者必须解释元数据。复合元数据允许独立格式化的元数据值(例如用于路由、安全、跟踪),每个值都有自己的 MIME 类型。应用程序需要一种配置支持的元数据 MIME 类型的方式,以及一种访问提取值的方式。

MetadataExtractor 是一个合同,用于接收序列化的元数据并返回解码后的名称-值对,然后可以通过名称访问,例如通过注解处理程序方法中的 @Header

DefaultMetadataExtractor 可以提供 Decoder 实例来解码元数据。它内置支持 "message/x.rsocket.routing.v0",它将其解码为 String 并保存在 "route" 键下。对于任何其他 MIME 类型,您需要提供一个 Decoder 并按以下方式注册 MIME 类型:

  • Java

  • Kotlin

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")

复合元数据很好地结合了独立的元数据值。但请求者可能不支持复合元数据,或者可能选择不使用它。对于这种情况,DefaultMetadataExtractor 可能需要自定义逻辑来将解码后的值映射到输出映射中。以下是一个使用 JSON 作为元数据的示例:

  • Java

  • Kotlin

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
	MimeType.valueOf("application/vnd.myapp.metadata+json"),
	new ParameterizedTypeReference<Map<String,String>>() {},
	(jsonMap, outputMap) -> {
		outputMap.putAll(jsonMap);
	});
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
	outputMap.putAll(jsonMap)
}

通过 RSocketStrategies 配置 MetadataExtractor 时,您可以让 RSocketStrategies.Builder 使用配置的解码器创建提取器,并简单地使用回调来自定义注册,如下所示:

  • Java

  • Kotlin

RSocketStrategies strategies = RSocketStrategies.builder()
	.metadataExtractorRegistry(registry -> {
		registry.metadataToExtract(fooMimeType, Foo.class, "foo");
		// ...
	})
	.build();
import org.springframework.messaging.rsocket.metadataToExtract

val strategies = RSocketStrategies.builder()
		.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
			registry.metadataToExtract<Foo>(fooMimeType, "foo")
			// ...
		}
		.build()

RSocket 接口

Spring Framework 允许您将 RSocket 服务定义为具有 @RSocketExchange 方法的 Java 接口。您可以将这样的接口传递给 RSocketServiceProxyFactory,以创建通过 RSocketRequester 执行请求的代理。您还可以将接口实现为处理请求的响应者。

首先创建具有 @RSocketExchange 方法的接口:

interface RadarService {

	@RSocketExchange("radars")
	Flux<AirportLocation> getRadars(@Payload MapRequest request);

	// 更多 RSocket 交换方法...

}

现在,您可以创建一个在调用方法时执行请求的代理:

RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();

RadarService service = factory.createClient(RadarService.class);

您还可以实现接口作为响应者处理请求。请参阅 带注解的响应者

方法参数

带注解的 RSocket 交换方法支持具有以下方法参数的灵活方法签名:

方法参数 描述

@DestinationVariable

添加路由变量以传递给 RSocketRequester,以及来自 @RSocketExchange 注解的路由,以扩展路由中的模板占位符。此变量可以是字符串或任何对象,然后通过 toString() 格式化。

@Payload

设置请求的输入负载。这可以是具体值,也可以是任何可以适应响应式流 Publisher 的值的生产者,通过 ReactiveAdapterRegistry 适配。

Object,如果后跟 MimeType

输入负载中元数据条目的值。只要下一个参数是元数据条目 MimeType,此值可以是任何 Object,只要该值是单个值的生产者,可以通过 ReactiveAdapterRegistry 适应为响应式流 Publisher

MimeType

元数据条目的 MimeType。预期前一个方法参数是元数据值。

返回值

带注解的 RSocket 交换方法支持具体值或任何可以适应响应式流 Publisher 的值的生产者作为返回值。

默认情况下,具有同步(阻塞)方法签名的 RSocket 服务方法的行为取决于底层 RSocket ClientTransport 的响应超时设置以及 RSocket 保持活动设置。 RSocketServiceProxyFactory.Builder 还公开了一个 blockTimeout 选项,还允许您配置阻塞响应的最长时间,但我们建议在 RSocket 级别配置超时值以获得更多控制。