接收消息
这里描述了如何在Spring中使用JMS接收消息。
同步接收
虽然JMS通常与异步处理相关联,但您可以同步消费消息。重载的receive(..)
方法提供了这种功能。在同步接收期间,调用线程会阻塞,直到消息变为可用。这可能是一项危险的操作,因为调用线程可能会被无限期地阻塞。 receiveTimeout
属性指定接收者在放弃等待消息之前应等待多长时间。
异步接收:消息驱动的POJO
Spring还通过使用@JmsListener 注解支持带注解的监听器端点,并提供了一个开放的基础架构来以编程方式注册端点。这是目前设置异步接收器最方便的方式。有关更多详细信息,请参见启用监听器端点注解。 |
与EJB世界中的消息驱动Bean(MDB)类似,消息驱动的POJO(MDP)充当JMS消息的接收者。对MDP的一个限制(但请参见使用MessageListenerAdapter
)是它必须实现jakarta.jms.MessageListener
接口。请注意,如果您的POJO在多个线程上接收消息,则重要的是确保您的实现是线程安全的。
以下示例展示了MDP的简单实现:
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.TextMessage;
public class ExampleListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage textMessage) {
try {
System.out.println(textMessage.getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("消息必须是TextMessage类型");
}
}
}
一旦您实现了MessageListener
,就该创建一个消息监听器容器了。
以下示例展示了如何定义和配置Spring附带的消息监听器容器之一(在本例中为DefaultMessageListenerContainer
):
<!-- 这是消息驱动的POJO(MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>
<!-- 这是消息监听器容器 -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
查看各种消息监听器容器的Spring javadoc(所有这些容器都实现了MessageListenerContainer)以获取每个实现支持的功能的完整描述。
使用SessionAwareMessageListener
接口
SessionAwareMessageListener
接口是一个Spring特定的接口,它提供了与JMS MessageListener
接口类似的约定,但还允许消息处理方法访问接收到消息的JMS Session
。以下代码显示了SessionAwareMessageListener
接口的定义:
package org.springframework.jms.listener;
public interface SessionAwareMessageListener {
void onMessage(Message message, Session session) throws JMSException;
}
如果您希望您的MDPs实现此接口(而不是标准的JMS MessageListener
接口),以便您的MDPs能够响应任何接收到的消息(通过在onMessage(Message, Session)
方法中使用提供的Session
),则可以选择这样做。所有与Spring一起提供的消息监听器容器实现都支持实现了MessageListener
或SessionAwareMessageListener
接口的MDPs。实现SessionAwareMessageListener
的类带有一个警告,即它们通过接口与Spring绑定。是否使用它完全取决于您作为应用程序开发人员或架构师。
请注意,SessionAwareMessageListener
接口的onMessage(..)
方法会抛出JMSException
。与标准的JMS MessageListener
接口相反,当使用SessionAwareMessageListener
接口时,客户端代码有责任处理抛出的任何异常。
使用 MessageListenerAdapter
MessageListenerAdapter
类是Spring异步消息支持中的最终组件。简而言之,它允许您将几乎任何类公开为MDP(尽管存在一些约束)。
考虑以下接口定义:
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
}
请注意,尽管该接口既不扩展MessageListener
接口,也不扩展SessionAwareMessageListener
接口,但您仍然可以使用它作为MDP,通过使用MessageListenerAdapter
类。还请注意各种消息处理方法如何根据它们可以接收和处理的各种Message
类型的内容而强类型化。
现在考虑以下MessageDelegate
接口的实现:
public class DefaultMessageDelegate implements MessageDelegate {
// 为清晰起见省略实现...
}
特别要注意的是,前面的MessageDelegate
接口实现(DefaultMessageDelegate
类)根本没有任何JMS依赖关系。它确实是一个我们可以通过以下配置转换为MDP的POJO:
<!-- 这是消息驱动的POJO(MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultMessageDelegate"/>
</constructor-arg>
</bean>
<!-- 这是消息监听器容器... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
下一个示例展示了另一个只能处理接收JMS TextMessage
消息的MDP。请注意消息处理方法实际上被称为receive
(MessageListenerAdapter
中的消息处理方法的名称默认为handleMessage
),但它是可配置的(稍后在本节中可以看到)。还请注意receive(..)
方法是强类型化的,只能接收和响应JMS TextMessage
消息。以下清单显示了TextMessageDelegate
接口的定义:
public interface TextMessageDelegate {
void receive(TextMessage message);
}
以下清单显示了实现TextMessageDelegate
接口的类:
public class DefaultTextMessageDelegate implements TextMessageDelegate {
// 为清晰起见省略实现...
}
随后的MessageListenerAdapter
的配置如下:
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultTextMessageDelegate"/>
</constructor-arg>
<property name="defaultListenerMethod" value="receive"/>
<!-- 我们不希望自动提取消息上下文 -->
<property name="messageConverter">
<null/>
</property>
</bean>
请注意,如果messageListener
接收到除TextMessage
之外的JMS Message
类型,将抛出IllegalStateException
(随后被吞噬)。MessageListenerAdapter
类的另一个功能是,如果处理程序方法返回非void值,则自动发送回复Message
的能力。考虑以下接口和类:
public interface ResponsiveTextMessageDelegate {
// 注意返回类型...
String receive(TextMessage message);
}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
// 为清晰起见省略实现...
}
如果将DefaultResponsiveTextMessageDelegate
与MessageListenerAdapter
一起使用,则从执行'receive(..)'
方法返回的任何非空值(在默认配置中)将转换为TextMessage
。然后,生成的TextMessage
将被发送到原始Message
的JMS Reply-To
属性中定义的Destination
(如果存在),或者发送到MessageListenerAdapter
上设置的默认Destination
(如果已配置)。如果找不到Destination
,则会抛出InvalidDestinationException
(请注意,此异常不会被吞噬,而是传播到调用堆栈上)。
在事务中处理消息
在事务中调用消息监听器只需要重新配置监听器容器。
您可以通过监听器容器定义上的sessionTransacted
标志激活本地资源事务。然后,每个消息监听器调用都在活动的JMS事务中运行,如果监听器执行失败,则消息接收将被回滚。发送响应消息(通过SessionAwareMessageListener
)是同一个本地事务的一部分,但任何其他资源操作(例如数据库访问)都是独立运行的。这通常需要在监听器实现中进行重复消息检测,以涵盖数据库处理已提交但消息处理未能提交的情况。
考虑以下bean定义:
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="sessionTransacted" value="true"/>
</bean>
要参与外部管理的事务,您需要配置一个事务管理器,并使用支持外部管理事务的监听器容器(通常为DefaultMessageListenerContainer
)。
要为XA事务参与配置消息监听器容器,您需要配置一个JtaTransactionManager
(默认情况下委托给Jakarta EE服务器的事务子系统)。请注意,底层的JMS ConnectionFactory
需要支持XA,并且需要正确注册到您的JTA事务协调器。 (检查您的Jakarta EE服务器对JNDI资源的配置。)这样可以使消息接收以及(例如)数据库访问成为同一个事务的一部分(具有统一的提交语义,但会增加XA事务日志开销)。
以下bean定义创建一个事务管理器:
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
然后,我们需要将其添加到先前的容器配置中。容器会处理其余部分。以下示例显示如何执行此操作:
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="transactionManager" ref="transactionManager"/> (1)
</bean>
1 | 我们的事务管理器。 |