Spring框架提供了广泛的支持,用于与消息系统集成,从简化使用JmsTemplate
的JMS API到完整的基础设施以异步接收消息。Spring AMQP为高级消息队列协议提供了类似的功能集。Spring Boot还为RabbitTemplate
和RabbitMQ提供了自动配置选项。Spring WebSocket本身包含对STOMP消息的支持,Spring Boot通过启动器和少量自动配置支持该功能。Spring Boot还支持Apache Kafka和Apache Pulsar。
1. JMS消息服务
jakarta.jms.ConnectionFactory
接口提供了一个标准的方法来创建一个与JMS代理交互的jakarta.jms.Connection
。虽然Spring需要一个ConnectionFactory
来与JMS一起工作,但通常您不需要直接使用它,而是可以依赖于更高级别的消息抽象。 (有关详细信息,请参阅Spring Framework参考文档的相关部分。)Spring Boot还会自动配置必要的基础设施来发送和接收消息。
1.1. ActiveMQ "Classic"支持
当在类路径上可用时,Spring Boot可以配置一个ConnectionFactory
以使用ActiveMQ "Classic"。
如果您使用spring-boot-starter-activemq ,则会提供连接到ActiveMQ "Classic"实例所需的必要依赖项,以及与JMS集成的Spring基础设施。 |
ActiveMQ "Classic"的配置由外部配置属性spring.activemq.*
控制。默认情况下,ActiveMQ "Classic"被自动配置为使用TCP传输,默认连接到tcp://localhost:61616
。以下示例显示如何更改默认的代理URL:
spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret
spring:
activemq:
broker-url: "tcp://192.168.1.210:9876"
user: "admin"
password: "secret"
默认情况下,CachingConnectionFactory
使用合理的设置包装原生的ConnectionFactory
,您可以通过外部配置属性spring.jms.*
来控制这些设置:
spring.jms.cache.session-cache-size=5
spring:
jms:
cache:
session-cache-size: 5
如果您更喜欢使用本机连接池,可以通过添加依赖项org.messaginghub:pooled-jms
并相应地配置JmsPoolConnectionFactory
来实现,如下例所示:
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
spring:
activemq:
pool:
enabled: true
max-connections: 50
有关更多支持的选项,请参阅ActiveMQProperties 。您还可以注册任意数量实现ActiveMQConnectionFactoryCustomizer 的bean以进行更高级别的自定义。 |
默认情况下,ActiveMQ "Classic"如果目的地尚不存在,则会创建一个目的地,以便根据其提供的名称解析目的地。
1.2. ActiveMQ Artemis支持
当Spring Boot检测到在类路径上可用ActiveMQ Artemis时,可以自动配置一个ConnectionFactory
。如果代理存在,则会自动启动和配置一个嵌入式代理(除非已显式设置了模式属性)。支持的模式有embedded
(明确指定需要嵌入式代理并且如果代理不在类路径上则应发生错误)和native
(使用netty
传输协议连接到代理)。配置后者时,Spring Boot会配置一个连接到运行在本地机器上的代理的ConnectionFactory
,使用默认设置。
如果您使用spring-boot-starter-artemis ,则会提供连接到现有ActiveMQ Artemis实例所需的必要依赖项,以及与JMS集成的Spring基础设施。将org.apache.activemq:artemis-jakarta-server 添加到您的应用程序可以让您使用嵌入式模式。 |
ActiveMQ Artemis的配置由外部配置属性spring.artemis.*
控制。例如,您可以在application.properties
中声明以下部分:
spring.artemis.mode=native
spring.artemis.broker-url=tcp://192.168.1.210:9876
spring.artemis.user=admin
spring.artemis.password=secret
spring:
artemis:
mode: native
broker-url: "tcp://192.168.1.210:9876"
user: "admin"
password: "secret"
在嵌入代理时,您可以选择是否启用持久性并列出应提供的目的地。这些可以指定为逗号分隔的列表以使用默认选项创建它们,或者您可以定义org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration
或org.apache.activemq.artemis.jms.server.config.TopicConfiguration
类型的bean,分别用于高级队列和主题配置。
默认情况下,CachingConnectionFactory
使用合理的设置包装原生的ConnectionFactory
,您可以通过外部配置属性spring.jms.*
来控制这些设置:
spring.jms.cache.session-cache-size=5
spring:
jms:
cache:
session-cache-size: 5
如果您更喜欢使用本机连接池,可以通过添加依赖项org.messaginghub:pooled-jms
并相应地配置JmsPoolConnectionFactory
来实现,如下例所示:
spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50
spring:
artemis:
pool:
enabled: true
max-connections: 50
有关更多支持的选项,请参阅ArtemisProperties
。
不涉及JNDI查找,目的地根据其名称解析,使用ActiveMQ Artemis配置中的name
属性或通过配置提供的名称。
1.3. 使用JNDI ConnectionFactory
如果您在应用服务器中运行应用程序,Spring Boot会尝试使用JNDI来定位JMS ConnectionFactory
。默认情况下,会检查java:/JmsXA
和java:/XAConnectionFactory
位置。如果需要指定替代位置,可以使用spring.jms.jndi-name
属性,如下例所示:
spring.jms.jndi-name=java:/MyConnectionFactory
spring:
jms:
jndi-name: "java:/MyConnectionFactory"
1.4. 发送消息
Spring的JmsTemplate
是自动配置的,您可以直接将其自动装配到您自己的bean中,如下例所示:
@Component
public class MyBean {
private final JmsTemplate jmsTemplate;
public MyBean(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
}
@Component
class MyBean(private val jmsTemplate: JmsTemplate) {
}
JmsMessagingTemplate 可以以类似的方式注入。如果定义了DestinationResolver 或MessageConverter bean,则它会自动关联到自动配置的JmsTemplate 。 |
1.5. 接收消息
当JMS基础设施存在时,任何bean都可以使用@JmsListener
进行注解以创建一个监听器端点。如果未定义JmsListenerContainerFactory
,则会自动配置一个默认工厂。如果定义了DestinationResolver
、MessageConverter
或jakarta.jms.ExceptionListener
bean,则它们会自动关联到默认工厂。
默认情况下,默认工厂是事务性的。如果在具有JtaTransactionManager
的基础设施中运行,则默认情况下会将其与监听器容器关联。如果没有,则启用sessionTransacted
标志。在后一种情况下,您可以通过在监听器方法(或其委托)上添加@Transactional
来将本地数据存储事务与传入消息的处理关联起来。这确保一旦本地事务完成,传入消息就会被确认。这还包括在同一JMS会话上执行的发送响应消息。
以下组件在someQueue
目的地上创建一个监听器端点:
@Component
public class MyBean {
@JmsListener(destination = "someQueue")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@JmsListener(destination = "someQueue")
fun processMessage(content: String?) {
// ...
}
}
查看更多详细信息,请参阅@EnableJms 的Javadoc。 |
如果需要创建更多JmsListenerContainerFactory
实例,或者想要覆盖默认设置,Spring Boot提供了一个DefaultJmsListenerContainerFactoryConfigurer
,您可以使用它来初始化一个具有与自动配置相同设置的DefaultJmsListenerContainerFactory
。
例如,以下示例展示了另一个使用特定MessageConverter
的工厂:
@Configuration(proxyBeanMethods = false)
public class MyJmsConfiguration {
@Bean
public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
ConnectionFactory connectionFactory = getCustomConnectionFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
private ConnectionFactory getCustomConnectionFactory() {
return ...
}
}
@Configuration(proxyBeanMethods = false)
class MyJmsConfiguration {
@Bean
fun myFactory(configurer: DefaultJmsListenerContainerFactoryConfigurer): DefaultJmsListenerContainerFactory {
val factory = DefaultJmsListenerContainerFactory()
val connectionFactory = getCustomConnectionFactory()
configurer.configure(factory, connectionFactory)
factory.setMessageConverter(MyMessageConverter())
return factory
}
fun getCustomConnectionFactory() : ConnectionFactory? {
return ...
}
}
然后,您可以在任何使用@JmsListener
注解的方法中使用工厂,如下所示:
@Component
public class MyBean {
@JmsListener(destination = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@JmsListener(destination = "someQueue", containerFactory = "myFactory")
fun processMessage(content: String?) {
// ...
}
}
2. AMQP
2. AMQP高级消息队列协议(AMQP)是一个平台中立的、基于消息的中间件的协议。Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发。Spring Boot通过RabbitMQ为使用AMQP提供了几个便利功能,包括spring-boot-starter-amqp
“Starter”。
2.1. RabbitMQ支持
RabbitMQ是一个基于AMQP协议的轻量级、可靠、可扩展和便携的消息代理。Spring使用RabbitMQ通过AMQP协议进行通信。
RabbitMQ的配置由spring.rabbitmq.*
中的外部配置属性控制。例如,您可以在application.properties
中声明以下部分:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
rabbitmq:
host: "localhost"
port: 5672
username: "admin"
password: "secret"
或者,您可以使用addresses
属性配置相同的连接:
spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
rabbitmq:
addresses: "amqp://admin:secret@localhost"
以这种方式指定地址时,host 和port 属性将被忽略。如果地址使用amqps 协议,则SSL支持将自动启用。 |
查看更多支持基于属性的配置选项,请参阅RabbitProperties
。要配置Spring AMQP使用的RabbitMQ ConnectionFactory
的更低级别细节,请定义一个ConnectionFactoryCustomizer
bean。
如果上下文中存在ConnectionNameStrategy
bean,则自动配置的CachingConnectionFactory
将自动使用它来命名创建的连接。
要对RabbitTemplate
进行应用范围的附加自定义,请使用RabbitTemplateCustomizer
bean。
有关更多详细信息,请参阅了解RabbitMQ使用的协议AMQP。 |
2.2. 发送消息
Spring的AmqpTemplate
和AmqpAdmin
是自动配置的,您可以直接将它们自动装配到您自己的bean中,如下例所示:
@Component
public class MyBean {
private final AmqpAdmin amqpAdmin;
private final AmqpTemplate amqpTemplate;
public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}
}
@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {
}
RabbitMessagingTemplate 可以以类似的方式注入。如果定义了MessageConverter bean,则它将自动关联到自动配置的AmqpTemplate 。 |
如果需要,任何定义为bean的org.springframework.amqp.core.Queue
将自动用于在RabbitMQ实例上声明相应的队列。
要重试操作,您可以在AmqpTemplate
上启用重试(例如,在经纪人连接丢失的情况下):
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
rabbitmq:
template:
retry:
enabled: true
initial-interval: "2s"
默认情况下禁用重试。您还可以通过声明一个RabbitRetryTemplateCustomizer
bean来以编程方式自定义RetryTemplate
。
如果需要创建更多的RabbitTemplate
实例,或者想要覆盖默认设置,Spring Boot提供了一个RabbitTemplateConfigurer
bean,您可以使用它来初始化一个RabbitTemplate
,其设置与自动配置使用的工厂相同。
2.3. 发送消息到流
要向特定流发送消息,请指定流的名称,如下例所示:
spring.rabbitmq.stream.name=my-stream
spring:
rabbitmq:
stream:
name: "my-stream"
如果定义了MessageConverter
、StreamMessageConverter
或ProducerCustomizer
bean,则它将自动关联到自动配置的RabbitStreamTemplate
。
如果需要创建更多的RabbitStreamTemplate
实例,或者想要覆盖默认设置,Spring Boot提供了一个RabbitStreamTemplateConfigurer
bean,您可以使用它来初始化一个RabbitStreamTemplate
,其设置与自动配置使用的工厂相同。
2.4. 接收消息
当Rabbit基础设施存在时,任何bean都可以用@RabbitListener
注解来创建一个监听器端点。如果没有定义RabbitListenerContainerFactory
,则会自动配置一个默认的SimpleRabbitListenerContainerFactory
,您可以通过spring.rabbitmq.listener.type
属性切换到直接容器。如果定义了MessageConverter
或MessageRecoverer
bean,则会自动与默认工厂关联。
以下示例组件在someQueue
队列上创建了一个监听器端点:
@Component
public class MyBean {
@RabbitListener(queues = "someQueue")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"])
fun processMessage(content: String?) {
// ...
}
}
查看@EnableRabbit 的Javadoc获取更多详细信息。 |
如果需要创建更多的RabbitListenerContainerFactory
实例,或者想要覆盖默认设置,Spring Boot提供了SimpleRabbitListenerContainerFactoryConfigurer
和DirectRabbitListenerContainerFactoryConfigurer
,您可以使用它们来初始化一个SimpleRabbitListenerContainerFactory
和一个DirectRabbitListenerContainerFactory
,并使用与自动配置使用的工厂相同的设置。
无论选择哪种容器类型都无关紧要。这两个bean都由自动配置暴露出来。 |
例如,以下配置类公开了另一个使用特定MessageConverter
的工厂:
@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {
@Bean
public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
ConnectionFactory connectionFactory = getCustomConnectionFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
private ConnectionFactory getCustomConnectionFactory() {
return ...
}
}
@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {
@Bean
fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
val factory = SimpleRabbitListenerContainerFactory()
val connectionFactory = getCustomConnectionFactory()
configurer.configure(factory, connectionFactory)
factory.setMessageConverter(MyMessageConverter())
return factory
}
fun getCustomConnectionFactory() : ConnectionFactory? {
return ...
}
}
然后,您可以在任何使用@RabbitListener
注解的方法中使用该工厂,如下所示:
@Component
public class MyBean {
@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
fun processMessage(content: String?) {
// ...
}
}
您可以启用重试来处理监听器抛出异常的情况。默认情况下,使用RejectAndDontRequeueRecoverer
,但您可以定义自己的MessageRecoverer
。当重试耗尽时,消息将被拒绝,并且如果代理配置为这样做,则将被丢弃或路由到死信交换。默认情况下,重试被禁用。您还可以通过声明RabbitRetryTemplateCustomizer
bean来以编程方式自定义RetryTemplate
。
默认情况下,如果禁用重试并且监听器抛出异常,则传递将无限重试。您可以通过两种方式修改此行为:将defaultRequeueRejected 属性设置为false ,以便不尝试重新传递,或者抛出AmqpRejectAndDontRequeueException 来表示应拒绝消息。当启用重试并达到最大传递尝试次数时,后者是使用的机制。 |
3. Apache Kafka支持
Apache Kafka 是通过提供 spring-kafka
项目的自动配置来支持的。
Kafka 配置受 spring.kafka.*
中的外部配置属性控制。例如,您可以在 application.properties
中声明以下部分:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring:
kafka:
bootstrap-servers: "localhost:9092"
consumer:
group-id: "myGroup"
要在启动时创建一个主题,请添加一个类型为 NewTopic 的 bean。如果主题已经存在,则该 bean 将被忽略。 |
查看 KafkaProperties
获取更多支持的选项。
3.1. 发送消息
Spring 的 KafkaTemplate
是自动配置的,您可以直接在自己的 bean 中自动装配它,如下例所示:
@Component
public class MyBean {
private final KafkaTemplate<String, String> kafkaTemplate;
public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
}
@Component
class MyBean(private val kafkaTemplate: KafkaTemplate<String, String>) {
}
如果定义了属性 spring.kafka.producer.transaction-id-prefix ,将自动配置一个 KafkaTransactionManager 。此外,如果定义了一个 RecordMessageConverter bean,则会自动将其关联到自动配置的 KafkaTemplate 。 |
3.2. 接收消息
当 Apache Kafka 基础设施存在时,任何 bean 都可以用 @KafkaListener
注解来创建一个监听器端点。如果未定义 KafkaListenerContainerFactory
,则会自动配置一个默认的工厂,并使用 spring.kafka.listener.*
中定义的键。
以下组件在 someTopic
主题上创建一个监听器端点:
@Component
public class MyBean {
@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@KafkaListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
如果定义了一个 KafkaTransactionManager
bean,则会自动将其关联到容器工厂。类似地,如果定义了 RecordFilterStrategy
、CommonErrorHandler
、AfterRollbackProcessor
或 ConsumerAwareRebalanceListener
bean,则会自动将其关联到默认工厂。
根据监听器类型,会将 RecordMessageConverter
或 BatchMessageConverter
bean 关联到默认工厂。如果仅为批量监听器存在 RecordMessageConverter
bean,则会将其包装在 BatchMessageConverter
中。
自定义的 ChainedKafkaTransactionManager 必须标记为 @Primary ,因为它通常引用自动配置的 KafkaTransactionManager bean。 |
3.3. Kafka Streams
Spring for Apache Kafka 提供了一个工厂 bean 来创建一个 StreamsBuilder
对象并管理其流的生命周期。只要类路径中存在 kafka-streams
并且通过 @EnableKafkaStreams
注解启用了 Kafka Streams,Spring Boot 就会自动配置所需的 KafkaStreamsConfiguration
bean。
启用 Kafka Streams 意味着必须设置应用程序 ID 和引导服务器。前者可以使用 spring.kafka.streams.application-id
进行配置,默认为 spring.application.name
(如果未设置)。后者可以全局设置,或仅为流覆盖。
使用专用属性可以使用其他附加属性;可以使用 spring.kafka.streams.properties
命名空间设置其他任意 Kafka 属性。另请参阅 其他 Kafka 属性 以获取更多信息。
要使用工厂 bean,请将 StreamsBuilder
注入到您的 @Bean
中,如下例所示:
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {
@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
return stream;
}
private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
return new KeyValue<>(key, value.toUpperCase());
}
}
默认情况下,由 StreamBuilder
对象管理的流会自动启动。您可以使用 spring.kafka.streams.auto-startup
属性自定义此行为。
3.4. 额外的Kafka属性
自动配置支持的属性在附录的“集成属性”部分中显示。请注意,大部分这些属性(连字符或驼峰命名)直接映射到Apache Kafka的点分属性。有关详细信息,请参阅Apache Kafka文档。
不包含客户端类型(producer
、consumer
、admin
或streams
)在其名称中的属性被视为通用属性,适用于所有客户端。大多数这些通用属性可以根据需要覆盖一个或多个客户端类型。
Apache Kafka将属性指定为高、中或低重要性。Spring Boot自动配置支持所有高重要性属性,一些选定的中和低重要性属性,以及没有默认值的任何属性。
只有Kafka支持的属性子集可以直接通过KafkaProperties
类获得。如果您希望配置不受直接支持的附加属性的各个客户端类型,请使用以下属性:
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
spring:
kafka:
properties:
"[prop.one]": "first"
admin:
properties:
"[prop.two]": "second"
consumer:
properties:
"[prop.three]": "third"
producer:
properties:
"[prop.four]": "fourth"
streams:
properties:
"[prop.five]": "fifth"
这将把通用的prop.one
Kafka属性设置为first
(适用于生产者、消费者、管理员和流),将prop.two
管理员属性设置为second
,将prop.three
消费者属性设置为third
,将prop.four
生产者属性设置为fourth
,将prop.five
流属性设置为fifth
。
您还可以按以下方式配置Spring Kafka的JsonDeserializer
:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
spring:
kafka:
consumer:
value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
properties:
"[spring.json.value.default.type]": "com.example.Invoice"
"[spring.json.trusted.packages]": "com.example.main,com.example.another"
类似地,您可以禁用JsonSerializer
默认行为,不在标头中发送类型信息:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
spring:
kafka:
producer:
value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
properties:
"[spring.json.add.type.headers]": false
以这种方式设置的属性将覆盖Spring Boot明确支持的任何配置项。 |
3.5. 使用嵌入式Kafka进行测试
Spring for Apache Kafka提供了一种方便的方式来使用嵌入式Apache Kafka代理测试项目。要使用此功能,请在测试类上使用@EmbeddedKafka
,该注解来自spring-kafka-test
模块。有关更多信息,请参阅Spring for Apache Kafka的参考手册。
为使Spring Boot自动配置与上述嵌入式Apache Kafka代理配合工作,您需要将嵌入式代理地址(由EmbeddedKafkaBroker
填充)重新映射为Apache Kafka的Spring Boot配置属性。有几种方法可以实现:
-
在测试类中提供一个系统属性,将嵌入式代理地址映射到
spring.kafka.bootstrap-servers
:
static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
init {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
}
-
在
@EmbeddedKafka
注解上配置一个属性名称:
@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
}
@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
}
-
在配置属性中使用占位符:
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring:
kafka:
bootstrap-servers: "${spring.embedded.kafka.brokers}"
4. Apache Pulsar支持
Apache Pulsar得到支持,提供了Spring for Apache Pulsar项目的自动配置。
当org.springframework.pulsar:spring-pulsar
在类路径上时,Spring Boot将自动配置并注册经典(命令式)的Spring for Apache Pulsar组件。当org.springframework.pulsar:spring-pulsar-reactive
在类路径上时,它也会为响应式组件执行相同的操作。
有spring-boot-starter-pulsar
和spring-boot-starter-pulsar-reactive
“启动器”可方便地收集用于命令式和响应式使用的依赖项。
4.1. 连接到Pulsar
当您使用Pulsar启动器时,Spring Boot将自动配置并注册一个PulsarClient
bean。
默认情况下,应用程序会尝试连接到本地Pulsar实例,地址为pulsar://localhost:6650
。这可以通过将spring.pulsar.client.service-url
属性设置为不同的值来调整。
该值必须是有效的Pulsar协议 URL |
您可以通过指定任何以spring.pulsar.client.*
为前缀的应用程序属性来配置客户端。
如果您需要更多对配置的控制,请考虑注册一个或多个PulsarClientBuilderCustomizer
bean。
4.1.1. 认证
要连接到需要认证的Pulsar集群,您需要指定要使用的认证插件,方法是设置pluginClassName
和插件所需的任何参数。您可以将参数设置为参数名称到参数值的映射。以下示例显示了如何配置AuthenticationOAuth2
插件。
spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param[issuerUrl]=https://auth.server.cloud/
spring.pulsar.client.authentication.param[privateKey]=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
param:
issuerUrl: https://auth.server.cloud/
privateKey: file:///Users/some-key.json
audience: urn:sn:acme:dev:my-instance
您需要确保在 例如,如果您想要为 这种缺乏松散绑定也使得对认证参数使用环境变量存在问题,因为在翻译过程中会丢失大小写敏感性。如果您使用环境变量作为参数,则需要按照Spring for Apache Pulsar参考文档中的这些步骤才能正常工作。 |
4.2. 响应式连接到Pulsar
ReactivePulsarClient bean。
ReactivePulsarClient
适配先前描述的PulsarClient
的实例。因此,请按照前一节的说明配置PulsarClient
,以供ReactivePulsarClient
使用。
4.3. 连接到Pulsar管理
Spring for Apache Pulsar的PulsarAdministration
客户端也会自动配置。
默认情况下,应用程序会尝试连接到本地Pulsar实例,地址为http://localhost:8080
。这可以通过将spring.pulsar.admin.service-url
属性设置为形式为(http|https)://<host>:<port>
的不同值来调整。
如果您需要更多对配置的控制,请考虑注册一个或多个PulsarAdminBuilderCustomizer
bean。
4.3.1. 认证
当访问需要认证的Pulsar集群时,管理客户端需要与常规Pulsar客户端相同的安全配置。您可以通过将spring.pulsar.client.authentication
替换为spring.pulsar.admin.authentication
来使用前面提到的认证配置。
要在启动时创建主题,请添加一个类型为PulsarTopic 的bean。如果主题已经存在,则会忽略该bean。 |
4.4. 发送消息
Spring的PulsarTemplate
已自动配置,您可以使用它来发送消息,如下例所示:
@Component
public class MyBean {
private final PulsarTemplate<String> pulsarTemplate;
public MyBean(PulsarTemplate<String> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}
public void someMethod() throws PulsarClientException {
this.pulsarTemplate.send("someTopic", "Hello");
}
}
@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {
@Throws(PulsarClientException::class)
fun someMethod() {
pulsarTemplate.send("someTopic", "Hello")
}
}
PulsarTemplate
依赖于PulsarProducerFactory
来创建底层的Pulsar生产者。Spring Boot自动配置还提供了此生产者工厂,默认情况下,它会缓存创建的生产者。您可以通过指定任何以spring.pulsar.producer.*
和spring.pulsar.producer.cache.*
为前缀的应用程序属性来配置生产者工厂和缓存设置。
如果您需要更多对生产者工厂配置的控制,请考虑注册一个或多个ProducerBuilderCustomizer
bean。这些自定义器将应用于所有创建的生产者。您还可以在发送消息时传递一个ProducerBuilderCustomizer
,以仅影响当前生产者。
如果您需要更多对发送消息的控制,请在发送消息时传递一个TypedMessageBuilderCustomizer
。
4.5. 响应式发送消息
当激活响应式自动配置时,Spring的ReactivePulsarTemplate
会被自动配置,您可以像下面的示例一样使用它来发送消息:
@Component
public class MyBean {
private final ReactivePulsarTemplate<String> pulsarTemplate;
public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}
public void someMethod() {
this.pulsarTemplate.send("someTopic", "Hello").subscribe();
}
}
@Component
class MyBean(private val pulsarTemplate: ReactivePulsarTemplate<String>) {
fun someMethod() {
pulsarTemplate.send("someTopic", "Hello").subscribe()
}
}
ReactivePulsarTemplate
依赖于ReactivePulsarSenderFactory
来实际创建底层的发送器。Spring Boot自动配置还提供了这个发送器工厂,默认情况下会缓存它创建的生产者。您可以通过指定任何以spring.pulsar.producer.*
和spring.pulsar.producer.cache.*
为前缀的应用程序属性来配置发送器工厂和缓存设置。
如果您需要更多控制发送器工厂的配置,请考虑注册一个或多个ReactiveMessageSenderBuilderCustomizer
bean。这些定制器将应用于所有创建的发送器。您还可以在发送消息时传入一个ReactiveMessageSenderBuilderCustomizer
,以仅影响当前发送器。
如果您需要更多控制要发送的消息,可以在发送消息时传入一个MessageSpecBuilderCustomizer
。
4.6. 接收消息
当Apache Pulsar基础设施存在时,任何bean都可以使用@PulsarListener
注解来创建一个监听器端点。以下组件在someTopic
主题上创建了一个监听器端点:
@Component
public class MyBean {
@PulsarListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@PulsarListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
Spring Boot自动配置提供了所有PulsarListener
所需的组件,如PulsarListenerContainerFactory
和它用于构建底层Pulsar消费者的消费者工厂。您可以通过指定任何以spring.pulsar.listener.*
和spring.pulsar.consumer.*
为前缀的应用程序属性来配置这些组件。
如果您需要更多控制消费者工厂的配置,请考虑注册一个或多个ConsumerBuilderCustomizer
bean。这些定制器将应用于工厂创建的所有消费者,因此所有@PulsarListener
实例。您还可以通过设置@PulsarListener
注解的consumerCustomizer
属性来自定义单个监听器。
4.7. 响应式接收消息
当Apache Pulsar基础设施存在且激活了响应式自动配置时,任何bean都可以使用@ReactivePulsarListener
注解来创建一个响应式监听器端点。以下组件在someTopic
主题上创建了一个响应式监听器端点:
@Component
public class MyBean {
@ReactivePulsarListener(topics = "someTopic")
public Mono<Void> processMessage(String content) {
// ...
return Mono.empty();
}
}
@Component
class MyBean {
@ReactivePulsarListener(topics = ["someTopic"])
fun processMessage(content: String?): Mono<Void> {
// ...
return Mono.empty()
}
}
Spring Boot自动配置提供了所有ReactivePulsarListener
所需的组件,如ReactivePulsarListenerContainerFactory
和它用于构建底层响应式Pulsar消费者的消费者工厂。您可以通过指定任何以spring.pulsar.listener.
和spring.pulsar.consumer.
为前缀的应用程序属性来配置这些组件。
如果您需要更多控制消费者工厂的配置,请考虑注册一个或多个ReactiveMessageConsumerBuilderCustomizer
bean。这些定制器将应用于工厂创建的所有消费者,因此所有@ReactivePulsarListener
实例。您还可以通过设置@ReactivePulsarListener
注解的consumerCustomizer
属性来自定义单个监听器。
4.8. 读取消息
Pulsar读取器接口使应用程序能够手动管理游标。当您使用读取器连接到主题时,需要指定读取器连接到主题时从哪条消息开始读取。
当Apache Pulsar基础设施存在时,任何bean都可以使用@PulsarReader
注解来使用读取器消费消息。以下组件创建了一个读取器端点,从someTopic
主题的开头开始读取消息:
@Component
public class MyBean {
@PulsarReader(topics = "someTopic", startMessageId = "earliest")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
fun processMessage(content: String?) {
// ...
}
}
@PulsarReader
依赖于PulsarReaderFactory
来创建底层的Pulsar读取器。Spring Boot自动配置提供了这个读取器工厂,可以通过设置任何以spring.pulsar.reader.*
为前缀的应用程序属性进行自定义。
如果您需要更多控制读取器工厂的配置,请考虑注册一个或多个ReaderBuilderCustomizer
bean。这些定制器将应用于工厂创建的所有读取器,因此所有@PulsarReader
实例。您还可以通过设置@PulsarReader
注解的readerCustomizer
属性来自定义单个读取器。
4.9. 以响应式方式读取消息
当Apache Pulsar基础设施存在并且激活了Reactive自动配置时,Spring的ReactivePulsarReaderFactory
会被提供,您可以使用它来创建一个读取消息的响应式读取器。以下组件使用提供的工厂创建一个读取器,并从someTopic
主题中读取5分钟前的单条消息:
@Component
public class MyBean {
private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;
public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
this.pulsarReaderFactory = pulsarReaderFactory;
}
public void someMethod() {
ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
.topic("someTopic")
.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
Mono<Message<String>> message = this.pulsarReaderFactory
.createReader(Schema.STRING, List.of(readerBuilderCustomizer))
.readOne();
// ...
}
}
@Component
class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory<String>) {
fun someMethod() {
val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer {
readerBuilder: ReactiveMessageReaderBuilder<String> ->
readerBuilder
.topic("someTopic")
.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5))
}
val message = pulsarReaderFactory
.createReader(Schema.STRING, listOf(readerBuilderCustomizer))
.readOne()
// ...
}
}
Spring Boot自动配置提供了这个读取器工厂,可以通过设置任何以spring.pulsar.reader.*
为前缀的应用程序属性进行自定义。
如果您需要更多控制读取器工厂配置,考虑在使用工厂创建读取器时传入一个或多个ReactiveMessageReaderBuilderCustomizer
实例。
如果您需要更多控制读取器工厂配置,考虑注册一个或多个ReactiveMessageReaderBuilderCustomizer
bean。这些自定义器将应用于所有创建的读取器。您还可以在创建读取器时传入一个或多个ReactiveMessageReaderBuilderCustomizer
,以仅将自定义应用于创建的读取器。
有关上述组件的更多详细信息以及发现其他可用功能,请参阅Spring for Apache Pulsar 参考文档。 |
4.10. 其他Pulsar属性
自动配置支持的属性在附录的“集成属性”部分中列出。请注意,这些属性(连字符或驼峰命名)在很大程度上直接映射到Apache Pulsar配置属性。有关详细信息,请参阅Apache Pulsar文档。
仅有Pulsar支持的属性可以直接通过PulsarProperties
类获得。如果您希望使用不受直接支持的其他属性调整自动配置的组件,可以使用每个前述组件支持的自定义器。
5. RSocket
RSocket是一种用于字节流传输的二进制协议。它通过单个连接实现了对称交互模型,通过异步消息传递来实现。
Spring Framework的spring-messaging
模块提供了对RSocket请求者和响应者的支持,既可以在客户端上使用,也可以在服务器端上使用。查看Spring Framework参考文档中的RSocket部分以获取更多详细信息,包括RSocket协议概述。
5.1. RSocket策略自动配置
Spring Boot自动配置了一个RSocketStrategies
bean,提供了所有编码和解码RSocket负载所需的基础设施。默认情况下,自动配置将尝试按顺序配置以下内容:
-
CBOR编解码器与Jackson
-
JSON编解码器与Jackson
spring-boot-starter-rsocket
starter提供了这两个依赖项。查看Jackson支持部分以了解更多自定义可能性。
开发人员可以通过创建实现RSocketStrategiesCustomizer
接口的bean来自定义RSocketStrategies
组件。请注意,它们的@Order
很重要,因为它决定了编解码器的顺序。
5.2. RSocket服务器自动配置
Spring Boot提供了RSocket服务器自动配置。所需的依赖项由spring-boot-starter-rsocket
提供。
Spring Boot允许从WebFlux服务器公开RSocket over WebSocket,或者独立启动一个独立的RSocket服务器。这取决于应用程序的类型和配置。
对于WebFlux应用程序(即WebApplicationType.REACTIVE
类型),只有在以下属性匹配时,RSocket服务器才会插入到Web服务器中:
spring.rsocket.server.mapping-path=/rsocket
spring.rsocket.server.transport=websocket
spring:
rsocket:
server:
mapping-path: "/rsocket"
transport: "websocket"
将RSocket插入到Web服务器中仅支持Reactor Netty,因为RSocket本身是使用该库构建的。 |
或者,作为独立的嵌入式服务器启动RSocket TCP或websocket服务器。除了依赖要求之外,唯一需要的配置是为该服务器定义一个端口:
spring.rsocket.server.port=9898
spring:
rsocket:
server:
port: 9898
5.3. Spring消息RSocket支持
Spring Boot将为RSocket自动配置Spring消息基础设施。
这意味着Spring Boot将创建一个RSocketMessageHandler
bean,用于处理RSocket请求到您的应用程序。
5.4. 使用RSocketRequester调用RSocket服务
一旦服务器和客户端之间建立了RSocket
通道,任何一方都可以向另一方发送或接收请求。
作为服务器,您可以在RSocket @Controller
的任何处理程序方法中注入一个RSocketRequester
实例。作为客户端,您需要首先配置和建立RSocket连接。Spring Boot为这种情况自动配置了一个RSocketRequester.Builder
,其中包含预期的编解码器,并应用任何RSocketConnectorConfigurer
bean。
RSocketRequester.Builder
实例是一个原型bean,这意味着每个注入点都会提供一个新实例。这是有意为之的,因为这个构建器是有状态的,您不应该使用相同实例创建具有不同设置的请求者。
以下代码展示了一个典型的示例:
@Service
public class MyService {
private final RSocketRequester rsocketRequester;
public MyService(RSocketRequester.Builder rsocketRequesterBuilder) {
this.rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898);
}
public Mono<User> someRSocketCall(String name) {
return this.rsocketRequester.route("user").data(name).retrieveMono(User.class);
}
}
@Service
class MyService(rsocketRequesterBuilder: RSocketRequester.Builder) {
private val rsocketRequester: RSocketRequester
init {
rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898)
}
fun someRSocketCall(name: String): Mono<User> {
return rsocketRequester.route("user").data(name).retrieveMono(
User::class.java
)
}
}
6. Spring集成
Spring Boot为使用Spring集成提供了几项便利功能,包括spring-boot-starter-integration
“Starter”。Spring集成提供了对消息传递以及其他传输方式(如HTTP、TCP等)的抽象。如果在类路径中可用Spring集成,则通过@EnableIntegration
注解进行初始化。
Spring集成的轮询逻辑依赖于自动配置的TaskScheduler
。默认的PollerMetadata
(每秒轮询无限数量的消息)可以通过spring.integration.poller.*
配置属性进行自定义。
Spring Boot还配置了一些功能,这些功能是由额外的Spring集成模块的存在触发的。如果类路径中也有spring-integration-jmx
,则会通过JMX发布消息处理统计信息。如果可用spring-integration-jdbc
,则默认数据库模式可以在启动时创建,如下所示:
spring.integration.jdbc.initialize-schema=always
spring:
integration:
jdbc:
initialize-schema: "always"
如果可用spring-integration-rsocket
,开发人员可以使用"spring.rsocket.server.*"
属性配置一个RSocket服务器,并让其使用IntegrationRSocketEndpoint
或RSocketOutboundGateway
组件来处理传入的RSocket消息。此基础设施可以处理Spring集成RSocket通道适配器和@MessageMapping
处理程序(假设已配置"spring.integration.rsocket.server.message-mapping-enabled"
)。
Spring Boot还可以使用配置属性自动配置一个ClientRSocketConnector
:
# 连接到TCP上的RSocket服务器
spring.integration.rsocket.client.host=example.org
spring.integration.rsocket.client.port=9898
# 连接到TCP上的RSocket服务器
spring:
integration:
rsocket:
client:
host: "example.org"
port: 9898
# 连接到WebSocket上的RSocket服务器
spring.integration.rsocket.client.uri=ws://example.org
# 连接到WebSocket上的RSocket服务器
spring:
integration:
rsocket:
client:
uri: "ws://example.org"
查看IntegrationAutoConfiguration
和IntegrationProperties
类以获取更多详细信息。
7. WebSockets
Spring Boot为嵌入式Tomcat、Jetty和Undertow提供了WebSockets自动配置。如果将war文件部署到独立容器中,Spring Boot会假定容器负责配置其WebSocket支持。
Spring框架为MVC Web应用程序提供了丰富的WebSocket支持,可以通过spring-boot-starter-websocket
模块轻松访问。
WebSocket支持也适用于响应式Web应用程序,需要在spring-boot-starter-webflux
旁边包含WebSocket API:
<dependency>
<groupId>jakarta.websocket</groupId>
<artifactId>jakarta.websocket-api</artifactId>
</dependency>