Spring框架提供了广泛的支持,用于与消息系统集成,从简化使用JmsTemplate的JMS API到完整的基础设施以异步接收消息。Spring AMQP为高级消息队列协议提供了类似的功能集。Spring Boot还为RabbitTemplate和RabbitMQ提供了自动配置选项。Spring WebSocket本身包含对STOMP消息的支持,Spring Boot通过启动器和少量自动配置支持该功能。Spring Boot还支持Apache Kafka和Apache Pulsar。

1. JMS消息服务

jakarta.jms.ConnectionFactory接口提供了一个标准的方法来创建一个与JMS代理交互的jakarta.jms.Connection。虽然Spring需要一个ConnectionFactory来与JMS一起工作,但通常您不需要直接使用它,而是可以依赖于更高级别的消息抽象。 (有关详细信息,请参阅Spring Framework参考文档的相关部分。)Spring Boot还会自动配置必要的基础设施来发送和接收消息。

1.1. ActiveMQ "Classic"支持

当在类路径上可用时,Spring Boot可以配置一个ConnectionFactory以使用ActiveMQ "Classic"

如果您使用spring-boot-starter-activemq,则会提供连接到ActiveMQ "Classic"实例所需的必要依赖项,以及与JMS集成的Spring基础设施。

ActiveMQ "Classic"的配置由外部配置属性spring.activemq.*控制。默认情况下,ActiveMQ "Classic"被自动配置为使用TCP传输,默认连接到tcp://localhost:61616。以下示例显示如何更改默认的代理URL:

属性
spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret
Yaml
spring:
  activemq:
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"

默认情况下,CachingConnectionFactory使用合理的设置包装原生的ConnectionFactory,您可以通过外部配置属性spring.jms.*来控制这些设置:

属性
spring.jms.cache.session-cache-size=5
Yaml
spring:
  jms:
    cache:
      session-cache-size: 5

如果您更喜欢使用本机连接池,可以通过添加依赖项org.messaginghub:pooled-jms并相应地配置JmsPoolConnectionFactory来实现,如下例所示:

属性
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
Yaml
spring:
  activemq:
    pool:
      enabled: true
      max-connections: 50
有关更多支持的选项,请参阅ActiveMQProperties。您还可以注册任意数量实现ActiveMQConnectionFactoryCustomizer的bean以进行更高级别的自定义。

默认情况下,ActiveMQ "Classic"如果目的地尚不存在,则会创建一个目的地,以便根据其提供的名称解析目的地。

1.2. ActiveMQ Artemis支持

当Spring Boot检测到在类路径上可用ActiveMQ Artemis时,可以自动配置一个ConnectionFactory。如果代理存在,则会自动启动和配置一个嵌入式代理(除非已显式设置了模式属性)。支持的模式有embedded(明确指定需要嵌入式代理并且如果代理不在类路径上则应发生错误)和native(使用netty传输协议连接到代理)。配置后者时,Spring Boot会配置一个连接到运行在本地机器上的代理的ConnectionFactory,使用默认设置。

如果您使用spring-boot-starter-artemis,则会提供连接到现有ActiveMQ Artemis实例所需的必要依赖项,以及与JMS集成的Spring基础设施。将org.apache.activemq:artemis-jakarta-server添加到您的应用程序可以让您使用嵌入式模式。

ActiveMQ Artemis的配置由外部配置属性spring.artemis.*控制。例如,您可以在application.properties中声明以下部分:

属性
spring.artemis.mode=native
spring.artemis.broker-url=tcp://192.168.1.210:9876
spring.artemis.user=admin
spring.artemis.password=secret
Yaml
spring:
  artemis:
    mode: native
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"

在嵌入代理时,您可以选择是否启用持久性并列出应提供的目的地。这些可以指定为逗号分隔的列表以使用默认选项创建它们,或者您可以定义org.apache.activemq.artemis.jms.server.config.JMSQueueConfigurationorg.apache.activemq.artemis.jms.server.config.TopicConfiguration类型的bean,分别用于高级队列和主题配置。

默认情况下,CachingConnectionFactory使用合理的设置包装原生的ConnectionFactory,您可以通过外部配置属性spring.jms.*来控制这些设置:

属性
spring.jms.cache.session-cache-size=5
Yaml
spring:
  jms:
    cache:
      session-cache-size: 5

如果您更喜欢使用本机连接池,可以通过添加依赖项org.messaginghub:pooled-jms并相应地配置JmsPoolConnectionFactory来实现,如下例所示:

属性
spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50
Yaml
spring:
  artemis:
    pool:
      enabled: true
      max-connections: 50

有关更多支持的选项,请参阅ArtemisProperties

不涉及JNDI查找,目的地根据其名称解析,使用ActiveMQ Artemis配置中的name属性或通过配置提供的名称。

1.3. 使用JNDI ConnectionFactory

如果您在应用服务器中运行应用程序,Spring Boot会尝试使用JNDI来定位JMS ConnectionFactory。默认情况下,会检查java:/JmsXAjava:/XAConnectionFactory位置。如果需要指定替代位置,可以使用spring.jms.jndi-name属性,如下例所示:

属性
spring.jms.jndi-name=java:/MyConnectionFactory
Yaml
spring:
  jms:
    jndi-name: "java:/MyConnectionFactory"

1.4. 发送消息

Spring的JmsTemplate是自动配置的,您可以直接将其自动装配到您自己的bean中,如下例所示:

Java
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final JmsTemplate jmsTemplate;

    public MyBean(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    // ...

    public void someMethod() {
        this.jmsTemplate.convertAndSend("hello");
    }

}
Kotlin
import org.springframework.jms.core.JmsTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val jmsTemplate: JmsTemplate) {

    // ...

    fun someMethod() {
        jmsTemplate.convertAndSend("hello")
    }

}
JmsMessagingTemplate可以以类似的方式注入。如果定义了DestinationResolverMessageConverter bean,则它会自动关联到自动配置的JmsTemplate

1.5. 接收消息

当JMS基础设施存在时,任何bean都可以使用@JmsListener进行注解以创建一个监听器端点。如果未定义JmsListenerContainerFactory,则会自动配置一个默认工厂。如果定义了DestinationResolverMessageConverterjakarta.jms.ExceptionListener bean,则它们会自动关联到默认工厂。

默认情况下,默认工厂是事务性的。如果在具有JtaTransactionManager的基础设施中运行,则默认情况下会将其与监听器容器关联。如果没有,则启用sessionTransacted标志。在后一种情况下,您可以通过在监听器方法(或其委托)上添加@Transactional来将本地数据存储事务与传入消息的处理关联起来。这确保一旦本地事务完成,传入消息就会被确认。这还包括在同一JMS会话上执行的发送响应消息。

以下组件在someQueue目的地上创建一个监听器端点:

Java
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @JmsListener(destination = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @JmsListener(destination = "someQueue")
    fun processMessage(content: String?) {
        // ...
    }

}
查看更多详细信息,请参阅@EnableJms的Javadoc

如果需要创建更多JmsListenerContainerFactory实例,或者想要覆盖默认设置,Spring Boot提供了一个DefaultJmsListenerContainerFactoryConfigurer,您可以使用它来初始化一个具有与自动配置相同设置的DefaultJmsListenerContainerFactory

例如,以下示例展示了另一个使用特定MessageConverter的工厂:

Java
import jakarta.jms.ConnectionFactory;

import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

@Configuration(proxyBeanMethods = false)
public class MyJmsConfiguration {

    @Bean
    public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        ConnectionFactory connectionFactory = getCustomConnectionFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new MyMessageConverter());
        return factory;
    }

    private ConnectionFactory getCustomConnectionFactory() {
        return ...
    }

}
Kotlin
import jakarta.jms.ConnectionFactory
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.config.DefaultJmsListenerContainerFactory

@Configuration(proxyBeanMethods = false)
class MyJmsConfiguration {

    @Bean
    fun myFactory(configurer: DefaultJmsListenerContainerFactoryConfigurer): DefaultJmsListenerContainerFactory {
        val factory = DefaultJmsListenerContainerFactory()
        val connectionFactory = getCustomConnectionFactory()
        configurer.configure(factory, connectionFactory)
        factory.setMessageConverter(MyMessageConverter())
        return factory
    }

    fun getCustomConnectionFactory() : ConnectionFactory? {
        return ...
    }

}

然后,您可以在任何使用@JmsListener注解的方法中使用工厂,如下所示:

Java
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @JmsListener(destination = "someQueue", containerFactory = "myFactory")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @JmsListener(destination = "someQueue", containerFactory = "myFactory")
    fun processMessage(content: String?) {
        // ...
    }

}

2. AMQP

2. AMQP

高级消息队列协议(AMQP)是一个平台中立的、基于消息的中间件的协议。Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发。Spring Boot通过RabbitMQ为使用AMQP提供了几个便利功能,包括spring-boot-starter-amqp“Starter”。

2.1. RabbitMQ支持

RabbitMQ是一个基于AMQP协议的轻量级、可靠、可扩展和便携的消息代理。Spring使用RabbitMQ通过AMQP协议进行通信。

RabbitMQ的配置由spring.rabbitmq.*中的外部配置属性控制。例如,您可以在application.properties中声明以下部分:

属性
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
Yaml
spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"

或者,您可以使用addresses属性配置相同的连接:

属性
spring.rabbitmq.addresses=amqp://admin:secret@localhost
Yaml
spring:
  rabbitmq:
    addresses: "amqp://admin:secret@localhost"
以这种方式指定地址时,hostport属性将被忽略。如果地址使用amqps协议,则SSL支持将自动启用。

查看更多支持基于属性的配置选项,请参阅RabbitProperties。要配置Spring AMQP使用的RabbitMQ ConnectionFactory的更低级别细节,请定义一个ConnectionFactoryCustomizer bean。

如果上下文中存在ConnectionNameStrategy bean,则自动配置的CachingConnectionFactory将自动使用它来命名创建的连接。

要对RabbitTemplate进行应用范围的附加自定义,请使用RabbitTemplateCustomizer bean。

有关更多详细信息,请参阅了解RabbitMQ使用的协议AMQP

2.2. 发送消息

Spring的AmqpTemplateAmqpAdmin是自动配置的,您可以直接将它们自动装配到您自己的bean中,如下例所示:

Java
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final AmqpAdmin amqpAdmin;

    private final AmqpTemplate amqpTemplate;

    public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
        this.amqpAdmin = amqpAdmin;
        this.amqpTemplate = amqpTemplate;
    }

    // ...

    public void someMethod() {
        this.amqpAdmin.getQueueInfo("someQueue");
    }

    public void someOtherMethod() {
        this.amqpTemplate.convertAndSend("hello");
    }

}
Kotlin
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {

    // ...

    fun someMethod() {
        amqpAdmin.getQueueInfo("someQueue")
    }

    fun someOtherMethod() {
        amqpTemplate.convertAndSend("hello")
    }

}
RabbitMessagingTemplate可以以类似的方式注入。如果定义了MessageConverter bean,则它将自动关联到自动配置的AmqpTemplate

如果需要,任何定义为bean的org.springframework.amqp.core.Queue将自动用于在RabbitMQ实例上声明相应的队列。

要重试操作,您可以在AmqpTemplate上启用重试(例如,在经纪人连接丢失的情况下):

属性
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
Yaml
spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"

默认情况下禁用重试。您还可以通过声明一个RabbitRetryTemplateCustomizer bean来以编程方式自定义RetryTemplate

如果需要创建更多的RabbitTemplate实例,或者想要覆盖默认设置,Spring Boot提供了一个RabbitTemplateConfigurer bean,您可以使用它来初始化一个RabbitTemplate,其设置与自动配置使用的工厂相同。

2.3. 发送消息到流

要向特定流发送消息,请指定流的名称,如下例所示:

属性
spring.rabbitmq.stream.name=my-stream
Yaml
spring:
  rabbitmq:
    stream:
      name: "my-stream"

如果定义了MessageConverterStreamMessageConverterProducerCustomizer bean,则它将自动关联到自动配置的RabbitStreamTemplate

如果需要创建更多的RabbitStreamTemplate实例,或者想要覆盖默认设置,Spring Boot提供了一个RabbitStreamTemplateConfigurer bean,您可以使用它来初始化一个RabbitStreamTemplate,其设置与自动配置使用的工厂相同。

2.4. 接收消息

当Rabbit基础设施存在时,任何bean都可以用@RabbitListener注解来创建一个监听器端点。如果没有定义RabbitListenerContainerFactory,则会自动配置一个默认的SimpleRabbitListenerContainerFactory,您可以通过spring.rabbitmq.listener.type属性切换到直接容器。如果定义了MessageConverterMessageRecoverer bean,则会自动与默认工厂关联。

以下示例组件在someQueue队列上创建了一个监听器端点:

Java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @RabbitListener(queues = ["someQueue"])
    fun processMessage(content: String?) {
        // ...
    }

}
查看@EnableRabbit的Javadoc获取更多详细信息。

如果需要创建更多的RabbitListenerContainerFactory实例,或者想要覆盖默认设置,Spring Boot提供了SimpleRabbitListenerContainerFactoryConfigurerDirectRabbitListenerContainerFactoryConfigurer,您可以使用它们来初始化一个SimpleRabbitListenerContainerFactory和一个DirectRabbitListenerContainerFactory,并使用与自动配置使用的工厂相同的设置。

无论选择哪种容器类型都无关紧要。这两个bean都由自动配置暴露出来。

例如,以下配置类公开了另一个使用特定MessageConverter的工厂:

Java
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {

    @Bean
    public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        ConnectionFactory connectionFactory = getCustomConnectionFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new MyMessageConverter());
        return factory;
    }

    private ConnectionFactory getCustomConnectionFactory() {
        return ...
    }

}
Kotlin
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {

    @Bean
    fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
        val factory = SimpleRabbitListenerContainerFactory()
        val connectionFactory = getCustomConnectionFactory()
        configurer.configure(factory, connectionFactory)
        factory.setMessageConverter(MyMessageConverter())
        return factory
    }

    fun getCustomConnectionFactory() : ConnectionFactory? {
        return ...
    }

}

然后,您可以在任何使用@RabbitListener注解的方法中使用该工厂,如下所示:

Java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue", containerFactory = "myFactory")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
    fun processMessage(content: String?) {
        // ...
    }

}

您可以启用重试来处理监听器抛出异常的情况。默认情况下,使用RejectAndDontRequeueRecoverer,但您可以定义自己的MessageRecoverer。当重试耗尽时,消息将被拒绝,并且如果代理配置为这样做,则将被丢弃或路由到死信交换。默认情况下,重试被禁用。您还可以通过声明RabbitRetryTemplateCustomizer bean来以编程方式自定义RetryTemplate

默认情况下,如果禁用重试并且监听器抛出异常,则传递将无限重试。您可以通过两种方式修改此行为:将defaultRequeueRejected属性设置为false,以便不尝试重新传递,或者抛出AmqpRejectAndDontRequeueException来表示应拒绝消息。当启用重试并达到最大传递尝试次数时,后者是使用的机制。

3. Apache Kafka支持

Apache Kafka 是通过提供 spring-kafka 项目的自动配置来支持的。

Kafka 配置受 spring.kafka.* 中的外部配置属性控制。例如,您可以在 application.properties 中声明以下部分:

属性
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
Yaml
spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"
要在启动时创建一个主题,请添加一个类型为 NewTopic 的 bean。如果主题已经存在,则该 bean 将被忽略。

查看 KafkaProperties 获取更多支持的选项。

3.1. 发送消息

Spring 的 KafkaTemplate 是自动配置的,您可以直接在自己的 bean 中自动装配它,如下例所示:

Java
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // ...

    public void someMethod() {
        this.kafkaTemplate.send("someTopic", "Hello");
    }

}
Kotlin
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val kafkaTemplate: KafkaTemplate<String, String>) {

    // ...

    fun someMethod() {
        kafkaTemplate.send("someTopic", "Hello")
    }

}
如果定义了属性 spring.kafka.producer.transaction-id-prefix,将自动配置一个 KafkaTransactionManager。此外,如果定义了一个 RecordMessageConverter bean,则会自动将其关联到自动配置的 KafkaTemplate

3.2. 接收消息

当 Apache Kafka 基础设施存在时,任何 bean 都可以用 @KafkaListener 注解来创建一个监听器端点。如果未定义 KafkaListenerContainerFactory,则会自动配置一个默认的工厂,并使用 spring.kafka.listener.* 中定义的键。

以下组件在 someTopic 主题上创建一个监听器端点:

Java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @KafkaListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @KafkaListener(topics = ["someTopic"])
    fun processMessage(content: String?) {
        // ...
    }

}

如果定义了一个 KafkaTransactionManager bean,则会自动将其关联到容器工厂。类似地,如果定义了 RecordFilterStrategyCommonErrorHandlerAfterRollbackProcessorConsumerAwareRebalanceListener bean,则会自动将其关联到默认工厂。

根据监听器类型,会将 RecordMessageConverterBatchMessageConverter bean 关联到默认工厂。如果仅为批量监听器存在 RecordMessageConverter bean,则会将其包装在 BatchMessageConverter 中。

自定义的 ChainedKafkaTransactionManager 必须标记为 @Primary,因为它通常引用自动配置的 KafkaTransactionManager bean。

3.3. Kafka Streams

Spring for Apache Kafka 提供了一个工厂 bean 来创建一个 StreamsBuilder 对象并管理其流的生命周期。只要类路径中存在 kafka-streams 并且通过 @EnableKafkaStreams 注解启用了 Kafka Streams,Spring Boot 就会自动配置所需的 KafkaStreamsConfiguration bean。

启用 Kafka Streams 意味着必须设置应用程序 ID 和引导服务器。前者可以使用 spring.kafka.streams.application-id 进行配置,默认为 spring.application.name(如果未设置)。后者可以全局设置,或仅为流覆盖。

使用专用属性可以使用其他附加属性;可以使用 spring.kafka.streams.properties 命名空间设置其他任意 Kafka 属性。另请参阅 其他 Kafka 属性 以获取更多信息。

要使用工厂 bean,请将 StreamsBuilder 注入到您的 @Bean 中,如下例所示:

Java
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.serializer.JsonSerde;

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
        stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
        return stream;
    }

    private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
        return new KeyValue<>(key, value.toUpperCase());
    }

}
Kotlin
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Produced
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafkaStreams
import org.springframework.kafka.support.serializer.JsonSerde

默认情况下,由 StreamBuilder 对象管理的流会自动启动。您可以使用 spring.kafka.streams.auto-startup 属性自定义此行为。

3.4. 额外的Kafka属性

自动配置支持的属性在附录的“集成属性”部分中显示。请注意,大部分这些属性(连字符或驼峰命名)直接映射到Apache Kafka的点分属性。有关详细信息,请参阅Apache Kafka文档。

不包含客户端类型(producerconsumeradminstreams)在其名称中的属性被视为通用属性,适用于所有客户端。大多数这些通用属性可以根据需要覆盖一个或多个客户端类型。

Apache Kafka将属性指定为高、中或低重要性。Spring Boot自动配置支持所有高重要性属性,一些选定的中和低重要性属性,以及没有默认值的任何属性。

只有Kafka支持的属性子集可以直接通过KafkaProperties类获得。如果您希望配置不受直接支持的附加属性的各个客户端类型,请使用以下属性:

属性
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
Yaml
spring:
  kafka:
    properties:
      "[prop.one]": "first"
    admin:
      properties:
        "[prop.two]": "second"
    consumer:
      properties:
        "[prop.three]": "third"
    producer:
      properties:
        "[prop.four]": "fourth"
    streams:
      properties:
        "[prop.five]": "fifth"

这将把通用的prop.one Kafka属性设置为first(适用于生产者、消费者、管理员和流),将prop.two管理员属性设置为second,将prop.three消费者属性设置为third,将prop.four生产者属性设置为fourth,将prop.five流属性设置为fifth

您还可以按以下方式配置Spring Kafka的JsonDeserializer

属性
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
Yaml
spring:
  kafka:
    consumer:
      value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
      properties:
        "[spring.json.value.default.type]": "com.example.Invoice"
        "[spring.json.trusted.packages]": "com.example.main,com.example.another"

类似地,您可以禁用JsonSerializer默认行为,不在标头中发送类型信息:

属性
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
Yaml
spring:
  kafka:
    producer:
      value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
      properties:
        "[spring.json.add.type.headers]": false
以这种方式设置的属性将覆盖Spring Boot明确支持的任何配置项。

3.5. 使用嵌入式Kafka进行测试

Spring for Apache Kafka提供了一种方便的方式来使用嵌入式Apache Kafka代理测试项目。要使用此功能,请在测试类上使用@EmbeddedKafka,该注解来自spring-kafka-test模块。有关更多信息,请参阅Spring for Apache Kafka的参考手册

为使Spring Boot自动配置与上述嵌入式Apache Kafka代理配合工作,您需要将嵌入式代理地址(由EmbeddedKafkaBroker填充)重新映射为Apache Kafka的Spring Boot配置属性。有几种方法可以实现:

  • 在测试类中提供一个系统属性,将嵌入式代理地址映射到spring.kafka.bootstrap-servers

Java
static {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
Kotlin
init {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
}
  • @EmbeddedKafka注解上配置一个属性名称:

Java
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;

@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

    // ...

}
Kotlin
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.test.context.EmbeddedKafka

@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

    // ...

}
  • 在配置属性中使用占位符:

属性
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
Yaml
spring:
  kafka:
    bootstrap-servers: "${spring.embedded.kafka.brokers}"

4. Apache Pulsar支持

Apache Pulsar得到支持,提供了Spring for Apache Pulsar项目的自动配置。

org.springframework.pulsar:spring-pulsar在类路径上时,Spring Boot将自动配置并注册经典(命令式)的Spring for Apache Pulsar组件。当org.springframework.pulsar:spring-pulsar-reactive在类路径上时,它也会为响应式组件执行相同的操作。

spring-boot-starter-pulsarspring-boot-starter-pulsar-reactive“启动器”可方便地收集用于命令式和响应式使用的依赖项。

4.1. 连接到Pulsar

当您使用Pulsar启动器时,Spring Boot将自动配置并注册一个PulsarClient bean。

默认情况下,应用程序会尝试连接到本地Pulsar实例,地址为pulsar://localhost:6650。这可以通过将spring.pulsar.client.service-url属性设置为不同的值来调整。

该值必须是有效的Pulsar协议 URL

您可以通过指定任何以spring.pulsar.client.*为前缀的应用程序属性来配置客户端。

如果您需要更多对配置的控制,请考虑注册一个或多个PulsarClientBuilderCustomizer bean。

4.1.1. 认证

要连接到需要认证的Pulsar集群,您需要指定要使用的认证插件,方法是设置pluginClassName和插件所需的任何参数。您可以将参数设置为参数名称到参数值的映射。以下示例显示了如何配置AuthenticationOAuth2插件。

属性
spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param[issuerUrl]=https://auth.server.cloud/
spring.pulsar.client.authentication.param[privateKey]=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
Yaml
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
        param:
          issuerUrl: https://auth.server.cloud/
          privateKey: file:///Users/some-key.json
          audience: urn:sn:acme:dev:my-instance

您需要确保在spring.pulsar.client.authentication.param.*下定义的名称与您的认证插件期望的名称完全匹配(通常是驼峰式)。Spring Boot不会尝试对这些条目进行任何形式的松散绑定。

例如,如果您想要为AuthenticationOAuth2认证插件配置发行者URL,则必须使用spring.pulsar.client.authentication.param.issuerUrl。如果使用其他形式,如issuerurlissuer-url,则设置将不会应用于插件。

这种缺乏松散绑定也使得对认证参数使用环境变量存在问题,因为在翻译过程中会丢失大小写敏感性。如果您使用环境变量作为参数,则需要按照Spring for Apache Pulsar参考文档中的这些步骤才能正常工作。

4.1.2. SSL

默认情况下,Pulsar客户端与Pulsar服务以纯文本通信。您可以按照Spring for Apache Pulsar参考文档中的这些步骤启用TLS加密。

有关客户端和认证的完整详细信息,请参阅Spring for Apache Pulsar的参考文档

4.2. 响应式连接到Pulsar

ReactivePulsarClient bean。

ReactivePulsarClient适配先前描述的PulsarClient的实例。因此,请按照前一节的说明配置PulsarClient,以供ReactivePulsarClient使用。

4.3. 连接到Pulsar管理

Spring for Apache Pulsar的PulsarAdministration客户端也会自动配置。

默认情况下,应用程序会尝试连接到本地Pulsar实例,地址为http://localhost:8080。这可以通过将spring.pulsar.admin.service-url属性设置为形式为(http|https)://<host>:<port>的不同值来调整。

如果您需要更多对配置的控制,请考虑注册一个或多个PulsarAdminBuilderCustomizer bean。

4.3.1. 认证

当访问需要认证的Pulsar集群时,管理客户端需要与常规Pulsar客户端相同的安全配置。您可以通过将spring.pulsar.client.authentication替换为spring.pulsar.admin.authentication来使用前面提到的认证配置

要在启动时创建主题,请添加一个类型为PulsarTopic的bean。如果主题已经存在,则会忽略该bean。

4.4. 发送消息

Spring的PulsarTemplate已自动配置,您可以使用它来发送消息,如下例所示:

Java
import org.apache.pulsar.client.api.PulsarClientException;

import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final PulsarTemplate<String> pulsarTemplate;

    public MyBean(PulsarTemplate<String> pulsarTemplate) {
        this.pulsarTemplate = pulsarTemplate;
    }

    public void someMethod() throws PulsarClientException {
        this.pulsarTemplate.send("someTopic", "Hello");
    }

}
Kotlin
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {

    @Throws(PulsarClientException::class)
    fun someMethod() {
        pulsarTemplate.send("someTopic", "Hello")
    }

}

PulsarTemplate依赖于PulsarProducerFactory来创建底层的Pulsar生产者。Spring Boot自动配置还提供了此生产者工厂,默认情况下,它会缓存创建的生产者。您可以通过指定任何以spring.pulsar.producer.*spring.pulsar.producer.cache.*为前缀的应用程序属性来配置生产者工厂和缓存设置。

如果您需要更多对生产者工厂配置的控制,请考虑注册一个或多个ProducerBuilderCustomizer bean。这些自定义器将应用于所有创建的生产者。您还可以在发送消息时传递一个ProducerBuilderCustomizer,以仅影响当前生产者。

如果您需要更多对发送消息的控制,请在发送消息时传递一个TypedMessageBuilderCustomizer

4.5. 响应式发送消息

当激活响应式自动配置时,Spring的ReactivePulsarTemplate会被自动配置,您可以像下面的示例一样使用它来发送消息:

Java
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final ReactivePulsarTemplate<String> pulsarTemplate;

    public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
        this.pulsarTemplate = pulsarTemplate;
    }

    public void someMethod() {
        this.pulsarTemplate.send("someTopic", "Hello").subscribe();
    }

}
Kotlin
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: ReactivePulsarTemplate<String>) {

    fun someMethod() {
        pulsarTemplate.send("someTopic", "Hello").subscribe()
    }

}

ReactivePulsarTemplate依赖于ReactivePulsarSenderFactory来实际创建底层的发送器。Spring Boot自动配置还提供了这个发送器工厂,默认情况下会缓存它创建的生产者。您可以通过指定任何以spring.pulsar.producer.*spring.pulsar.producer.cache.*为前缀的应用程序属性来配置发送器工厂和缓存设置。

如果您需要更多控制发送器工厂的配置,请考虑注册一个或多个ReactiveMessageSenderBuilderCustomizer bean。这些定制器将应用于所有创建的发送器。您还可以在发送消息时传入一个ReactiveMessageSenderBuilderCustomizer,以仅影响当前发送器。

如果您需要更多控制要发送的消息,可以在发送消息时传入一个MessageSpecBuilderCustomizer

4.6. 接收消息

当Apache Pulsar基础设施存在时,任何bean都可以使用@PulsarListener注解来创建一个监听器端点。以下组件在someTopic主题上创建了一个监听器端点:

Java
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @PulsarListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @PulsarListener(topics = ["someTopic"])
    fun processMessage(content: String?) {
        // ...
    }

}

Spring Boot自动配置提供了所有PulsarListener所需的组件,如PulsarListenerContainerFactory和它用于构建底层Pulsar消费者的消费者工厂。您可以通过指定任何以spring.pulsar.listener.*spring.pulsar.consumer.*为前缀的应用程序属性来配置这些组件。

如果您需要更多控制消费者工厂的配置,请考虑注册一个或多个ConsumerBuilderCustomizer bean。这些定制器将应用于工厂创建的所有消费者,因此所有@PulsarListener实例。您还可以通过设置@PulsarListener注解的consumerCustomizer属性来自定义单个监听器。

4.7. 响应式接收消息

当Apache Pulsar基础设施存在且激活了响应式自动配置时,任何bean都可以使用@ReactivePulsarListener注解来创建一个响应式监听器端点。以下组件在someTopic主题上创建了一个响应式监听器端点:

Java
import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @ReactivePulsarListener(topics = "someTopic")
    public Mono<Void> processMessage(String content) {
        // ...
        return Mono.empty();
    }

}
Kotlin
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono

@Component
class MyBean {

    @ReactivePulsarListener(topics = ["someTopic"])
    fun processMessage(content: String?): Mono<Void> {
        // ...
        return Mono.empty()
    }

}

Spring Boot自动配置提供了所有ReactivePulsarListener所需的组件,如ReactivePulsarListenerContainerFactory和它用于构建底层响应式Pulsar消费者的消费者工厂。您可以通过指定任何以spring.pulsar.listener.spring.pulsar.consumer.为前缀的应用程序属性来配置这些组件。

如果您需要更多控制消费者工厂的配置,请考虑注册一个或多个ReactiveMessageConsumerBuilderCustomizer bean。这些定制器将应用于工厂创建的所有消费者,因此所有@ReactivePulsarListener实例。您还可以通过设置@ReactivePulsarListener注解的consumerCustomizer属性来自定义单个监听器。

4.8. 读取消息

Pulsar读取器接口使应用程序能够手动管理游标。当您使用读取器连接到主题时,需要指定读取器连接到主题时从哪条消息开始读取。

当Apache Pulsar基础设施存在时,任何bean都可以使用@PulsarReader注解来使用读取器消费消息。以下组件创建了一个读取器端点,从someTopic主题的开头开始读取消息:

Java
import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @PulsarReader(topics = "someTopic", startMessageId = "earliest")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component

@Component
class MyBean {

    @PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
    fun processMessage(content: String?) {
        // ...
    }

}

@PulsarReader依赖于PulsarReaderFactory来创建底层的Pulsar读取器。Spring Boot自动配置提供了这个读取器工厂,可以通过设置任何以spring.pulsar.reader.*为前缀的应用程序属性进行自定义。

如果您需要更多控制读取器工厂的配置,请考虑注册一个或多个ReaderBuilderCustomizer bean。这些定制器将应用于工厂创建的所有读取器,因此所有@PulsarReader实例。您还可以通过设置@PulsarReader注解的readerCustomizer属性来自定义单个读取器。

4.9. 以响应式方式读取消息

当Apache Pulsar基础设施存在并且激活了Reactive自动配置时,Spring的ReactivePulsarReaderFactory会被提供,您可以使用它来创建一个读取消息的响应式读取器。以下组件使用提供的工厂创建一个读取器,并从someTopic主题中读取5分钟前的单条消息:

Java
import java.time.Instant;
import java.util.List;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;

    public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
        this.pulsarReaderFactory = pulsarReaderFactory;
    }

    public void someMethod() {
        ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
            .topic("someTopic")
            .startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
        Mono<Message<String>> message = this.pulsarReaderFactory
            .createReader(Schema.STRING, List.of(readerBuilderCustomizer))
            .readOne();
        // ...
    }

}
Kotlin
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder
import org.apache.pulsar.reactive.client.api.StartAtSpec
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory
import org.springframework.stereotype.Component
import java.time.Instant

@Component
class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory<String>) {

    fun someMethod() {
        val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer {
            readerBuilder: ReactiveMessageReaderBuilder<String> ->
                readerBuilder
                    .topic("someTopic")
                    .startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5))
        }
        val message = pulsarReaderFactory
                .createReader(Schema.STRING, listOf(readerBuilderCustomizer))
                .readOne()
        // ...
    }

}

Spring Boot自动配置提供了这个读取器工厂,可以通过设置任何以spring.pulsar.reader.*为前缀的应用程序属性进行自定义。

如果您需要更多控制读取器工厂配置,考虑在使用工厂创建读取器时传入一个或多个ReactiveMessageReaderBuilderCustomizer实例。

如果您需要更多控制读取器工厂配置,考虑注册一个或多个ReactiveMessageReaderBuilderCustomizer bean。这些自定义器将应用于所有创建的读取器。您还可以在创建读取器时传入一个或多个ReactiveMessageReaderBuilderCustomizer,以仅将自定义应用于创建的读取器。

有关上述组件的更多详细信息以及发现其他可用功能,请参阅Spring for Apache Pulsar 参考文档

4.10. 其他Pulsar属性

自动配置支持的属性在附录的“集成属性”部分中列出。请注意,这些属性(连字符或驼峰命名)在很大程度上直接映射到Apache Pulsar配置属性。有关详细信息,请参阅Apache Pulsar文档。

仅有Pulsar支持的属性可以直接通过PulsarProperties类获得。如果您希望使用不受直接支持的其他属性调整自动配置的组件,可以使用每个前述组件支持的自定义器。

5. RSocket

RSocket是一种用于字节流传输的二进制协议。它通过单个连接实现了对称交互模型,通过异步消息传递来实现。

Spring Framework的spring-messaging模块提供了对RSocket请求者和响应者的支持,既可以在客户端上使用,也可以在服务器端上使用。查看Spring Framework参考文档中的RSocket部分以获取更多详细信息,包括RSocket协议概述。

5.1. RSocket策略自动配置

Spring Boot自动配置了一个RSocketStrategies bean,提供了所有编码和解码RSocket负载所需的基础设施。默认情况下,自动配置将尝试按顺序配置以下内容:

  1. CBOR编解码器与Jackson

  2. JSON编解码器与Jackson

spring-boot-starter-rsocket starter提供了这两个依赖项。查看Jackson支持部分以了解更多自定义可能性。

开发人员可以通过创建实现RSocketStrategiesCustomizer接口的bean来自定义RSocketStrategies组件。请注意,它们的@Order很重要,因为它决定了编解码器的顺序。

5.2. RSocket服务器自动配置

Spring Boot提供了RSocket服务器自动配置。所需的依赖项由spring-boot-starter-rsocket提供。

Spring Boot允许从WebFlux服务器公开RSocket over WebSocket,或者独立启动一个独立的RSocket服务器。这取决于应用程序的类型和配置。

对于WebFlux应用程序(即WebApplicationType.REACTIVE类型),只有在以下属性匹配时,RSocket服务器才会插入到Web服务器中:

属性
spring.rsocket.server.mapping-path=/rsocket
spring.rsocket.server.transport=websocket
Yaml
spring:
  rsocket:
    server:
      mapping-path: "/rsocket"
      transport: "websocket"
将RSocket插入到Web服务器中仅支持Reactor Netty,因为RSocket本身是使用该库构建的。

或者,作为独立的嵌入式服务器启动RSocket TCP或websocket服务器。除了依赖要求之外,唯一需要的配置是为该服务器定义一个端口:

属性
spring.rsocket.server.port=9898
Yaml
spring:
  rsocket:
    server:
      port: 9898

5.3. Spring消息RSocket支持

Spring Boot将为RSocket自动配置Spring消息基础设施。

这意味着Spring Boot将创建一个RSocketMessageHandler bean,用于处理RSocket请求到您的应用程序。

5.4. 使用RSocketRequester调用RSocket服务

一旦服务器和客户端之间建立了RSocket通道,任何一方都可以向另一方发送或接收请求。

作为服务器,您可以在RSocket @Controller的任何处理程序方法中注入一个RSocketRequester实例。作为客户端,您需要首先配置和建立RSocket连接。Spring Boot为这种情况自动配置了一个RSocketRequester.Builder,其中包含预期的编解码器,并应用任何RSocketConnectorConfigurer bean。

RSocketRequester.Builder实例是一个原型bean,这意味着每个注入点都会提供一个新实例。这是有意为之的,因为这个构建器是有状态的,您不应该使用相同实例创建具有不同设置的请求者。

以下代码展示了一个典型的示例:

Java
import reactor.core.publisher.Mono;

import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.stereotype.Service;

@Service
public class MyService {

    private final RSocketRequester rsocketRequester;

    public MyService(RSocketRequester.Builder rsocketRequesterBuilder) {
        this.rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898);
    }

    public Mono<User> someRSocketCall(String name) {
        return this.rsocketRequester.route("user").data(name).retrieveMono(User.class);
    }

}
Kotlin
import org.springframework.messaging.rsocket.RSocketRequester
import org.springframework.stereotype.Service
import reactor.core.publisher.Mono

@Service
class MyService(rsocketRequesterBuilder: RSocketRequester.Builder) {

    private val rsocketRequester: RSocketRequester

    init {
        rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898)
    }

    fun someRSocketCall(name: String): Mono<User> {
        return rsocketRequester.route("user").data(name).retrieveMono(
            User::class.java
        )
    }

}

6. Spring集成

Spring Boot为使用Spring集成提供了几项便利功能,包括spring-boot-starter-integration“Starter”。Spring集成提供了对消息传递以及其他传输方式(如HTTP、TCP等)的抽象。如果在类路径中可用Spring集成,则通过@EnableIntegration注解进行初始化。

Spring集成的轮询逻辑依赖于自动配置的TaskScheduler。默认的PollerMetadata(每秒轮询无限数量的消息)可以通过spring.integration.poller.*配置属性进行自定义。

Spring Boot还配置了一些功能,这些功能是由额外的Spring集成模块的存在触发的。如果类路径中也有spring-integration-jmx,则会通过JMX发布消息处理统计信息。如果可用spring-integration-jdbc,则默认数据库模式可以在启动时创建,如下所示:

属性
spring.integration.jdbc.initialize-schema=always
Yaml
spring:
  integration:
    jdbc:
      initialize-schema: "always"

如果可用spring-integration-rsocket,开发人员可以使用"spring.rsocket.server.*"属性配置一个RSocket服务器,并让其使用IntegrationRSocketEndpointRSocketOutboundGateway组件来处理传入的RSocket消息。此基础设施可以处理Spring集成RSocket通道适配器和@MessageMapping处理程序(假设已配置"spring.integration.rsocket.server.message-mapping-enabled")。

Spring Boot还可以使用配置属性自动配置一个ClientRSocketConnector

属性
# 连接到TCP上的RSocket服务器
spring.integration.rsocket.client.host=example.org
spring.integration.rsocket.client.port=9898
Yaml
# 连接到TCP上的RSocket服务器
spring:
  integration:
    rsocket:
      client:
        host: "example.org"
        port: 9898
属性
# 连接到WebSocket上的RSocket服务器
spring.integration.rsocket.client.uri=ws://example.org
Yaml
# 连接到WebSocket上的RSocket服务器
spring:
  integration:
    rsocket:
      client:
        uri: "ws://example.org"

查看IntegrationAutoConfigurationIntegrationProperties类以获取更多详细信息。

7. WebSockets

Spring Boot为嵌入式Tomcat、Jetty和Undertow提供了WebSockets自动配置。如果将war文件部署到独立容器中,Spring Boot会假定容器负责配置其WebSocket支持。

Spring框架为MVC Web应用程序提供了丰富的WebSocket支持,可以通过spring-boot-starter-websocket模块轻松访问。

WebSocket支持也适用于响应式Web应用程序,需要在spring-boot-starter-webflux旁边包含WebSocket API:

<dependency>
    <groupId>jakarta.websocket</groupId>
    <artifactId>jakarta.websocket-api</artifactId>
</dependency>

8. 接下来阅读什么

下一节描述如何在应用程序中启用IO功能。您可以在本节中阅读有关缓存邮件验证REST客户端等内容。