基于注解的监听器端点

接收异步消息的最简单方式是使用注解的监听器端点基础设施。简而言之,它允许您将托管bean的方法公开为JMS监听器端点。以下示例展示了如何使用它:

@Component
public class MyService {

	@JmsListener(destination = "myDestination")
	public void processOrder(String data) { ... }
}

前面示例的思想是,每当在jakarta.jms.Destination myDestination上有消息可用时,相应地调用processOrder方法(在本例中,使用JMS消息的内容,类似于MessageListenerAdapter提供的方式)。

注解端点基础设施为每个带注解的方法在后台创建一个消息监听器容器,使用JmsListenerContainerFactory。这样的容器不会注册到应用程序上下文中,但可以通过使用JmsListenerEndpointRegistry bean轻松定位以进行管理。

@JmsListener是Java 8上的可重复注解,因此您可以通过向其添加额外的@JmsListener声明,将多个JMS目的地与同一方法关联。

启用监听器端点注解

要启用对@JmsListener注解的支持,您可以将@EnableJms添加到您的一个@Configuration类中,如下例所示:

@Configuration
@EnableJms
public class AppConfig {

	@Bean
	public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
		DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
		factory.setConnectionFactory(connectionFactory());
		factory.setDestinationResolver(destinationResolver());
		factory.setSessionTransacted(true);
		factory.setConcurrency("3-10");
		return factory;
	}
}

默认情况下,基础设施会查找名为jmsListenerContainerFactory的bean作为用于创建消息监听器容器的工厂的来源。在这种情况下(并忽略JMS基础设施设置),您可以使用核心池大小为三个线程和最大池大小为十个线程调用processOrder方法。

您可以为每个注解自定义监听器容器工厂,或者通过实现JmsListenerConfigurer接口配置一个显式默认值。只有在至少注册了一个端点而没有特定容器工厂的情况下才需要默认值。有关实现JmsListenerConfigurer的类的javadoc详细信息和示例,请参见JmsListenerConfigurer

如果您更喜欢XML配置,您可以使用<jms:annotation-driven>元素,如下例所示:

<jms:annotation-driven/>

<bean id="jmsListenerContainerFactory"
		class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destinationResolver" ref="destinationResolver"/>
	<property name="sessionTransacted" value="true"/>
	<property name="concurrency" value="3-10"/>
</bean>

程序化端点注册

JmsListenerEndpoint提供了JMS端点的模型,并负责为该模型配置容器。基础设施允许您以编程方式配置端点,除了通过JmsListener注解检测到的端点。以下示例展示了如何实现:

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

	@Override
	public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
		SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
		endpoint.setId("myJmsEndpoint");
		endpoint.setDestination("anotherQueue");
		endpoint.setMessageListener(message -> {
			// 处理
		});
		registrar.registerEndpoint(endpoint);
	}
}

在上面的示例中,我们使用了SimpleJmsListenerEndpoint,它提供了要调用的实际MessageListener。但是,您也可以构建自己的端点变体来描述自定义调用机制。

请注意,您可以完全跳过使用@JmsListener,并仅通过JmsListenerConfigurer以编程方式注册您的端点。

注解端点方法签名

到目前为止,我们在端点中注入了一个简单的String,但实际上它可以具有非常灵活的方法签名。在下面的示例中,我们将其重写为注入带有自定义标头的Order

@Component
public class MyService {

	@JmsListener(destination = "myDestination")
	public void processOrder(Order order, @Header("order_type") String orderType) {
		...
	}
}

您可以在JMS监听器端点中注入的主要元素如下:

  • 原始的jakarta.jms.Message或其任何子类(前提是它与传入消息类型匹配)。

  • 用于可选访问原生JMS API的jakarta.jms.Session(例如,用于发送自定义回复)。

  • 表示传入JMS消息的org.springframework.messaging.Message。请注意,此消息包含自定义和标准标头(由JmsHeaders定义)。

  • 用于提取特定标头值的@Header注释方法参数,包括标准JMS标头。

  • 必须还能分配给java.util.Map@Headers注释参数,以获取所有标头的访问权限。

  • 非注释元素,不是受支持类型之一(MessageSession),被视为有效载荷。您可以通过使用@Payload注释参数来明确表示这一点。您还可以通过添加额外的@Valid来启用验证。

注入Spring的Message抽象特别有用,以从存储在特定传输消息中的所有信息中受益,而不依赖于特定传输特定API。以下示例展示了如何实现:

@JmsListener(destination = "myDestination")
public void processOrder(Message<Order> order) { ... }

方法参数的处理由DefaultMessageHandlerMethodFactory提供,您可以进一步自定义以支持其他方法参数。您还可以在那里自定义转换和验证支持。

例如,如果我们希望在处理Order之前确保其有效性,我们可以使用@Valid注释有效载荷,并配置必要的验证器,如下例所示:

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

	@Override
	public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
		registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());
	}

	@Bean
	public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
		DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
		factory.setValidator(myValidator());
		return factory;
	}
}

响应管理

MessageListenerAdapter中的现有支持已经允许您的方法具有非void返回类型。在这种情况下,调用的结果封装在一个jakarta.jms.Message中,可以发送到原始消息的JMSReplyTo头中指定的目的地,或者发送到侦听器上配置的默认目的地。您现在可以通过使用消息抽象的@SendTo注解来设置默认目的地。

假设我们的processOrder方法现在应该返回一个OrderStatus,我们可以编写它以自动发送响应,如下例所示:

@JmsListener(destination = "myDestination")
@SendTo("status")
public OrderStatus processOrder(Order order) {
	// 订单处理
	return status;
}
如果您有多个使用@JmsListener注解的方法,您还可以将@SendTo注解放在类级别上,以共享默认的回复目的地。

如果您需要以与传输无关的方式设置附加标头,您可以返回一个Message,方法类似于以下内容:

@JmsListener(destination = "myDestination")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
	// 订单处理
	return MessageBuilder
			.withPayload(status)
			.setHeader("code", 1234)
			.build();
}

如果您需要在运行时计算响应目的地,您可以将响应封装在一个JmsResponse实例中,该实例还提供要在运行时使用的目的地。我们可以将前面的示例重写如下:

@JmsListener(destination = "myDestination")
public JmsResponse<Message<OrderStatus>> processOrder(Order order) {
	// 订单处理
	Message<OrderStatus> response = MessageBuilder
			.withPayload(status)
			.setHeader("code", 1234)
			.build();
	return JmsResponse.forQueue(response, "status");
}

最后,如果您需要为响应指定一些QoS值,例如优先级或存活时间,您可以相应地配置JmsListenerContainerFactory,如下例所示:

@Configuration
@EnableJms
public class AppConfig {

	@Bean
	public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
		DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
		factory.setConnectionFactory(connectionFactory());
		QosSettings replyQosSettings = new QosSettings();
		replyQosSettings.setPriority(2);
		replyQosSettings.setTimeToLive(10000);
		factory.setReplyQosSettings(replyQosSettings);
		return factory;
	}
}