|
此版本仍在开发中,尚未视为稳定版。如需最新稳定版本,请使用 Spring Boot 4.0.4! |
AMQP
高级消息队列协议(AMQP)是一种面向消息中间件的、与平台无关的线路级协议。
Spring AMQP 项目将核心 Spring 理念应用于基于 AMQP 的消息解决方案开发。
Spring Boot 为使用 AMQP 提供了多项便利:通用 AMQP 1.0 支持由 spring-boot-starter-amqp starter 提供,而特定的 RabbitMQ 支持则可通过 spring-boot-starter-rabbitmq starter 获得。
通用 AMQP 1.0 支持
AMQP 1.0 得到了多种代理和消息服务的支持,不仅限于 RabbitMQ,还包括 ActiveMQ、Azure Service Bus 等。
Spring AMQP 通过基于 Qpid ProtonJ2 客户端库 的 org.springframework.amqp:spring-amqp-client,提供了对 AMQP 1.0 的通用支持。
AMQP 配置由 spring.amqp.* 中的外部配置属性控制。
例如,您可以在配置中声明以下内容:
-
Properties
-
YAML
spring.amqp.host=localhost
spring.amqp.port=5672
spring.amqp.username=admin
spring.amqp.password=secret
spring:
amqp:
host: "localhost"
port: 5672
username: "admin"
password: "secret"
要配置自动配置的 AmqpConnectionFactory 的连接选项,请定义一个 ConnectionOptionsCustomizer Bean。
发送消息
Spring 的 AmqpClient 是自动配置的,您可以直接将其自动装配到您自己的 Bean 中,如下例所示:
-
Java
-
Kotlin
import org.springframework.amqp.client.AmqpClient;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final AmqpClient amqpClient;
public MyBean(AmqpClient amqpClient) {
this.amqpClient = amqpClient;
}
// ...
public void sendMessage(String msg) {
this.amqpClient.to("/queues/test").body(msg).send();
}
}
import org.springframework.amqp.client.AmqpClient
import org.springframework.stereotype.Component
@Component
class MyBean(private val amqpClient: AmqpClient) {
// ...
fun someOtherMethod(msg: String) {
amqpClient.to("/queues/test").body(msg).send()
}
}
如果定义了 MessageConverter Bean,它将自动与自动配置的 AmqpClient 关联。
如果未定义此类转换器且可用 Jackson,则使用 JacksonJsonMessageConverter。
特定于客户端的设置可以按如下方式配置:
-
Properties
-
YAML
spring.amqp.client.default-to-address=/queues/default_queue
spring.amqp.client.completion-timeout=500ms
spring:
amqp:
client:
default-to-address: "/queues/default_queue"
completion-timeout: "500ms"
要进一步配置自动配置的 AmqpClient,请定义一个 AmqpClientCustomizer Bean。
RabbitMQ 支持
RabbitMQ 是一个基于 AMQP 协议的轻量级、可靠、可扩展且可移植的消息代理。 Spring 使用 RabbitMQ 通过 AMQP 协议进行通信。
RabbitMQ 的配置由 spring.rabbitmq.* 中的外部配置属性进行控制。
例如,您可以在 application.properties 中声明以下配置段:
-
Properties
-
YAML
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
rabbitmq:
host: "localhost"
port: 5672
username: "admin"
password: "secret"
或者,您也可以使用 addresses 属性来配置相同的连接:
-
Properties
-
YAML
spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
rabbitmq:
addresses: "amqp://admin:secret@localhost"
以这种方式指定地址时,host 和 port 属性将被忽略。
如果地址使用 amqps 协议,则会自动启用 SSL 支持。 |
请参阅 RabbitProperties 以了解更多支持的基于属性的配置选项。
要配置 Spring AMQP 所使用的 RabbitMQ ConnectionFactory 的底层细节,请定义一个 ConnectionFactoryCustomizer Bean。
如果上下文中存在一个 ConnectionNameStrategy Bean,它将自动用于命名由自动配置的 CachingConnectionFactory 创建的连接。
要对 RabbitTemplate 进行应用程序范围的附加自定义,请使用 RabbitTemplateCustomizer bean。
| 更多详情请参见 理解 RabbitMQ 所使用的 AMQP 协议。 |
发送消息
Spring 的 AmqpTemplate 和 AmqpAdmin 是自动配置的,您可以直接将它们自动装配到您自己的 Bean 中,如下例所示:
-
Java
-
Kotlin
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final AmqpAdmin amqpAdmin;
private final AmqpTemplate amqpTemplate;
public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}
// ...
public void someMethod() {
this.amqpAdmin.getQueueInfo("someQueue");
}
public void someOtherMethod() {
this.amqpTemplate.convertAndSend("hello");
}
}
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {
// ...
fun someMethod() {
amqpAdmin.getQueueInfo("someQueue")
}
fun someOtherMethod() {
amqpTemplate.convertAndSend("hello")
}
}
RabbitMessagingTemplate 可以以类似的方式注入。
如果定义了 MessageConverter bean,它将自动关联到自动配置的 AmqpTemplate。 |
如有必要,任何被定义为 Bean 的 Queue 都会自动用于在 RabbitMQ 实例上声明对应的队列。
若要重试操作,您可以在 AmqpTemplate 上启用重试功能(例如,在代理连接丢失的情况下):
-
Properties
-
YAML
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
rabbitmq:
template:
retry:
enabled: true
initial-interval: "2s"
默认情况下已禁用重试。
您还可以通过声明一个 RabbitTemplateRetrySettingsCustomizer Bean,以编程方式自定义 RetryTemplate。
如果您需要创建更多 RabbitTemplate 实例,或者想要覆盖默认设置,Spring Boot 提供了一个 RabbitTemplateConfigurer Bean,您可以使用它来初始化一个 RabbitTemplate,其配置与自动配置所使用的工厂相同。
如果上下文中存在类型为 RabbitTemplateObservationConvention 的 Bean,它将自动在 RabbitTemplate 上进行配置。
向流发送消息
要向特定的流发送消息,请指定该流的名称,如下例所示:
-
Properties
-
YAML
spring.rabbitmq.stream.name=my-stream
spring:
rabbitmq:
stream:
name: "my-stream"
如果定义了 MessageConverter、StreamMessageConverter、ProducerCustomizer 或 RabbitStreamTemplateObservationConvention Bean,它将自动关联到自动配置的 RabbitStreamTemplate。
如果您需要创建更多 RabbitStreamTemplate 实例,或者想要覆盖默认设置,Spring Boot 提供了一个 RabbitStreamTemplateConfigurer Bean,您可以使用它来初始化一个 RabbitStreamTemplate,其配置与自动配置所使用的工厂相同。
SSL
要在 RabbitMQ Streams 中使用 SSL,请将 spring.rabbitmq.stream.ssl.enabled 设置为 true,或设置 spring.rabbitmq.stream.ssl.bundle 以配置要使用的 SSL 捆绑包。
接收消息
当 Rabbit 基础设施存在时,任何 Bean 都可以使用 @RabbitListener 进行注解,以创建监听器端点。
如果未定义 RabbitListenerContainerFactory,系统将自动配置一个默认的 SimpleRabbitListenerContainerFactory,并且您可以通过 spring.rabbitmq.listener.type 属性切换到直接容器模式。
如果定义了 MessageConverter、MessageRecoverer 或 RabbitListenerObservationConvention Bean,它将自动与默认工厂关联。
以下示例组件在 someQueue 队列上创建一个监听器端点:
-
Java
-
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@RabbitListener(queues = "someQueue")
public void processMessage(String content) {
// ...
}
}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"])
fun processMessage(content: String?) {
// ...
}
}
请参阅 @EnableRabbit 以获取更多详情。 |
如果您需要创建更多 RabbitListenerContainerFactory 实例,或者想要覆盖默认设置,Spring Boot 提供了一个 SimpleRabbitListenerContainerFactoryConfigurer 和一个 DirectRabbitListenerContainerFactoryConfigurer,您可以使用它们来初始化 SimpleRabbitListenerContainerFactory 和 DirectRabbitListenerContainerFactory,其配置与自动配置所使用的工厂相同。
| 你选择哪种容器类型并不重要。 这两个 Bean 由自动配置暴露出来。 |
例如,以下配置类公开了另一个使用特定 MessageConverter 的工厂:
-
Java
-
Kotlin
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.rabbitmq.autoconfigure.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {
@Bean
public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
ConnectionFactory connectionFactory = getCustomConnectionFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
private ConnectionFactory getCustomConnectionFactory() {
return ...
}
}
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.rabbitmq.autoconfigure.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {
@Bean
fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
val factory = SimpleRabbitListenerContainerFactory()
val connectionFactory = getCustomConnectionFactory()
configurer.configure(factory, connectionFactory)
factory.setMessageConverter(MyMessageConverter())
return factory
}
fun getCustomConnectionFactory() : ConnectionFactory {
return ...
}
}
然后,您可以在任何使用 @RabbitListener 注解的方法中使用该工厂,如下所示:
-
Java
-
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
fun processMessage(content: String?) {
// ...
}
}
您可以启用重试机制,以处理监听器抛出异常的情况。
默认情况下,使用的是 RejectAndDontRequeueRecoverer,但您也可以定义自己的 MessageRecoverer。
当重试次数用尽后,消息将被拒绝,并根据代理(broker)的配置,要么被丢弃,要么被路由到死信交换器(dead-letter exchange)。
默认情况下,重试功能是禁用的。
您还可以通过声明一个 RabbitListenerRetrySettingsCustomizer Bean,以编程方式自定义 RetryPolicy。
默认情况下,如果重试被禁用且监听器抛出异常,交付将无限次重试。
您可以通过两种方式修改此行为:将 defaultRequeueRejected 属性设置为 false,以便不尝试任何重新交付;或者抛出一个 AmqpRejectAndDontRequeueException 以信号表示应拒绝该消息。
后者是在启用重试且达到最大交付尝试次数时使用的机制。 |