STOMP 客户端

Spring提供了STOMP over WebSocket客户端和STOMP over TCP客户端。

首先,您可以创建和配置WebSocketStompClient,如下例所示:

WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // 用于心跳

在上面的示例中,您可以将StandardWebSocketClient替换为SockJsClient,因为它也是WebSocketClient的实现之一。 SockJsClient可以使用WebSocket或基于HTTP的传输作为备用。有关更多详细信息,请参见SockJsClient

接下来,您可以建立连接并为STOMP会话提供处理程序,如下例所示:

String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);

当会话准备就绪时,处理程序会收到通知,如下例所示:

public class MyStompSessionHandler extends StompSessionHandlerAdapter {

	@Override
	public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
		// ...
	}
}

会话建立后,任何有效负载都可以发送,并且将使用配置的MessageConverter进行序列化,如下例所示:

session.send("/topic/something", "payload");

您还可以订阅目的地。 subscribe方法需要一个订阅消息的处理程序,并返回一个Subscription句柄,您可以使用该句柄取消订阅。对于每个接收的消息,处理程序可以指定应将有效负载反序列化为的目标Object类型,如下例所示:

session.subscribe("/topic/something", new StompFrameHandler() {

	@Override
	public Type getPayloadType(StompHeaders headers) {
		return String.class;
	}

	@Override
	public void handleFrame(StompHeaders headers, Object payload) {
		// ...
	}

});

要启用STOMP心跳,您可以使用TaskScheduler配置WebSocketStompClient,并可选择自定义心跳间隔(写入不活动的10秒,导致发送心跳,读取不活动的10秒,关闭连接)。

WebSocketStompClient仅在不活动时发送心跳,即当没有其他消息发送时。当使用外部代理时,这可能会带来挑战,因为具有非代理目的地的消息表示活动,但实际上不会转发到代理。在这种情况下,您可以在初始化外部代理时配置一个TaskScheduler,这样即使只发送具有非代理目的地的消息,也会向代理转发心跳。

当您使用WebSocketStompClient进行性能测试以模拟来自同一台计算机的数千个客户端时,请考虑关闭心跳,因为每个连接都会安排自己的心跳任务,这对于在同一台计算机上运行大量客户端并不是最佳选择。

STOMP协议还支持收据,其中客户端必须添加一个receipt头,服务器在处理发送或订阅后会用RECEIPT帧进行响应。为了支持这一点,StompSession提供了setAutoReceipt(boolean),这会导致在每次后续发送或订阅事件上添加一个receipt头。或者,您还可以手动向StompHeaders添加一个receipt头。发送和订阅都会返回一个Receiptable实例,您可以使用它来注册收据成功和失败的回调。对于此功能,您必须使用TaskScheduler配置客户端以及收据过期前的时间量(默认为15秒)。

请注意,StompSessionHandler本身是一个StompFrameHandler,这使其能够处理ERROR帧,除了用于处理消息的handleException回调和用于处理传输级错误的handleTransportError,包括ConnectionLostException