此版本仍在开发中,尚未被视为稳定版本。如需最新稳定版本,请使用 Spring Boot 4.0.4!spring-doc.cadn.net.cn

AMQP

高级消息队列协议(AMQP)是一种与平台无关的、面向消息中继的线级协议。 Spring AMQP项目将核心Spring概念应用于基于AMQP的消息解决方案的开发。 Spring Boot为使用AMQP提供了多种便利:通用的AMQP 1.0支持由spring-boot-starter-amqpStarters提供,而特定的RabbitMQ支持则通过spring-boot-starter-rabbitmqStarters提供。spring-doc.cadn.net.cn

通用 AMQP 1.0 支持

AMQP 1.0 由除 RabbitMQ 以外的多个代理和消息服务支持,包括 ActiveMQ、Azure Service Bus 等。 Spring AMQP 通过 org.springframework.amqp:spring-amqp-client 提供对 AMQP 1.0 的 通用支持,该支持基于 Qpid ProtonJ2 客户端库spring-doc.cadn.net.cn

AMQP 配置由外部配置属性控制,位于 spring.amqp.* 中。 例如,您可以在配置中声明以下内容:spring-doc.cadn.net.cn

spring.amqp.host=localhost
spring.amqp.port=5672
spring.amqp.username=admin
spring.amqp.password=secret
spring:
  amqp:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"

要配置自动配置的 连接选项 AmqpConnectionFactory,请定义一个 ConnectionOptionsCustomizer bean。spring-doc.cadn.net.cn

发送消息

Spring 的 AmqpClient 是自动配置的,您可以直接将其自动装配到您自己的 Bean 中,如下例所示:spring-doc.cadn.net.cn

import org.springframework.amqp.client.AmqpClient;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final AmqpClient amqpClient;

	public MyBean(AmqpClient amqpClient) {
		this.amqpClient = amqpClient;
	}

	// ...

	public void sendMessage(String msg) {
		this.amqpClient.to("/queues/test").body(msg).send();
	}

}
import org.springframework.amqp.client.AmqpClient
import org.springframework.stereotype.Component

@Component
class MyBean(private val amqpClient: AmqpClient) {

	// ...

	fun someOtherMethod(msg: String) {
		amqpClient.to("/queues/test").body(msg).send()
	}

}

如果定义了一个 MessageConverter 的 bean,它会自动与自动配置的 AmqpClient 关联。 如果没有定义这样的转换器,并且 Jackson 可用,则使用 JacksonJsonMessageConverterspring-doc.cadn.net.cn

可以按如下方式配置客户端特定的设置:spring-doc.cadn.net.cn

spring.amqp.client.default-to-address=/queues/default_queue
spring.amqp.client.completion-timeout=500ms
spring:
  amqp:
    client:
      default-to-address: "/queues/default_queue"
      completion-timeout: "500ms"

要进一步配置自动配置的 AmqpClient,请定义一个 AmqpClientCustomizer bean。spring-doc.cadn.net.cn

RabbitMQ 支持

RabbitMQ 是一个基于 AMQP 协议的轻量级、可靠、可扩展且可移植的消息代理。 Spring 使用 RabbitMQ 通过 AMQP 协议进行通信。spring-doc.cadn.net.cn

RabbitMQ 配置由 spring.rabbitmq.* 中的外部配置属性控制。 例如,您可以在 application.properties 中声明以下部分:spring-doc.cadn.net.cn

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"

或者,您可以使用 addresses 属性配置相同的连接:spring-doc.cadn.net.cn

spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
  rabbitmq:
    addresses: "amqp://admin:secret@localhost"
以这种方式指定地址时,hostport 属性将被忽略。 如果地址使用 amqps 协议,则会自动启用 SSL 支持。

有关更多支持的基于属性的配置选项,请参见 RabbitProperties。 要配置 Spring AMQP 使用的 RabbitMQ ConnectionFactory 的底层细节,请定义一个 ConnectionFactoryCustomizer bean。spring-doc.cadn.net.cn

如果上下文中存在 ConnectionNameStrategy bean,它将自动用于命名由自动配置的 CachingConnectionFactory 创建的连接。spring-doc.cadn.net.cn

要对 RabbitTemplate 进行应用程序范围的附加自定义,请使用 RabbitTemplateCustomizer bean。spring-doc.cadn.net.cn

查看 了解 RabbitMQ 所使用的 AMQP 协议 以获取更多信息。

发送消息

Spring 的 AmqpTemplateAmqpAdmin 是自动配置的,您可以将它们直接自动装配到您自己的 Bean 中,如下例所示:spring-doc.cadn.net.cn

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final AmqpAdmin amqpAdmin;

	private final AmqpTemplate amqpTemplate;

	public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
		this.amqpAdmin = amqpAdmin;
		this.amqpTemplate = amqpTemplate;
	}

	// ...

	public void someMethod() {
		this.amqpAdmin.getQueueInfo("someQueue");
	}

	public void someOtherMethod() {
		this.amqpTemplate.convertAndSend("hello");
	}

}
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {

	// ...

	fun someMethod() {
		amqpAdmin.getQueueInfo("someQueue")
	}

	fun someOtherMethod() {
		amqpTemplate.convertAndSend("hello")
	}

}
RabbitMessagingTemplate 可以以类似的方式注入。 如果定义了 MessageConverter bean,它会自动关联到自动配置的 AmqpTemplate

如有必要,任何定义为 bean 的 Queue 都会自动用于在 RabbitMQ 实例上声明相应的队列。spring-doc.cadn.net.cn

要重试操作,您可以在 AmqpTemplate 上启用重试(例如,在代理连接丢失的情况下):spring-doc.cadn.net.cn

spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"

重试默认是禁用的。 您还可以通过声明一个 RetryTemplate 程序化地自定义它。spring-doc.cadn.net.cn

如果您需要创建更多 RabbitTemplate 实例,或者想要覆盖默认设置,Spring Boot 提供了一个 RabbitTemplateConfigurer Bean,您可以使用它来初始化一个 RabbitTemplate,其设置与自动配置所使用的工厂相同。spring-doc.cadn.net.cn

如果上下文中存在类型为 RabbitTemplateObservationConvention 的 Bean,它将自动配置在 RabbitTemplate 上。spring-doc.cadn.net.cn

向流发送消息

要向特定流发送消息,请指定流的名称,如下例所示:spring-doc.cadn.net.cn

spring.rabbitmq.stream.name=my-stream
spring:
  rabbitmq:
    stream:
      name: "my-stream"

如果您需要创建更多 RabbitStreamTemplate 实例,或者想要覆盖默认设置,Spring Boot 提供了一个 RabbitStreamTemplateConfigurer Bean,您可以使用它来初始化一个 RabbitStreamTemplate,其设置与自动配置所使用的工厂相同。spring-doc.cadn.net.cn

SSL

要使用 RabbitMQ Streams 的 SSL,请将 spring.rabbitmq.stream.ssl.enabled 设置为 true 或将 spring.rabbitmq.stream.ssl.bundle 设置为配置使用的 SSL 套件spring-doc.cadn.net.cn

接收消息

当存在Rabbit基础设施时,任何bean都可以使用@RabbitListener进行注解以创建一个监听器端点。 如果没有定义RabbitListenerContainerFactory,则会自动配置一个默认的SimpleRabbitListenerContainerFactory,您可以使用spring.rabbitmq.listener.type属性切换到直接容器。 如果定义了MessageConverterMessageRecovererRabbitListenerObservationConvention bean,它将自动与默认工厂关联。spring-doc.cadn.net.cn

以下示例组件在 someQueue 队列上创建了一个监听器端点:spring-doc.cadn.net.cn

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"])
	fun processMessage(content: String?) {
		// ...
	}

}
有关更多详细信息,请参阅 @EnableRabbit

如果您需要创建更多 RabbitListenerContainerFactory 实例,或者想要覆盖默认设置,Spring Boot 提供了一个 SimpleRabbitListenerContainerFactoryConfigurer 和一个 DirectRabbitListenerContainerFactoryConfigurer,您可以使用它们来初始化 SimpleRabbitListenerContainerFactoryDirectRabbitListenerContainerFactory,其设置与自动配置所使用的工厂相同。spring-doc.cadn.net.cn

无论你选择哪种容器类型都无关紧要。 这两个 bean 是由自动配置暴露的。

例如,以下配置类公开了另一个使用特定 MessageConverter 的工厂:spring-doc.cadn.net.cn

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.rabbitmq.autoconfigure.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {

	@Bean
	public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		ConnectionFactory connectionFactory = getCustomConnectionFactory();
		configurer.configure(factory, connectionFactory);
		factory.setMessageConverter(new MyMessageConverter());
		return factory;
	}

	private ConnectionFactory getCustomConnectionFactory() {
		return ...
	}

}
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.rabbitmq.autoconfigure.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {

	@Bean
	fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
		val factory = SimpleRabbitListenerContainerFactory()
		val connectionFactory = getCustomConnectionFactory()
		configurer.configure(factory, connectionFactory)
		factory.setMessageConverter(MyMessageConverter())
		return factory
	}

	fun getCustomConnectionFactory() : ConnectionFactory {
		return ...
	}

}

然后,您可以在任何带有 @RabbitListener 注解的方法中使用该工厂,如下所示:spring-doc.cadn.net.cn

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
	fun processMessage(content: String?) {
		// ...
	}

}

您可以启用重试以处理您的监听器抛出异常的情况。 默认情况下,使用 RejectAndDontRequeueRecoverer,但您可以定义自己的 MessageRecoverer。 当重试用完后,消息将被拒绝,并且如果代理配置为这样做,则会被丢弃或路由到死信交换。 默认情况下,重试处于禁用状态。 您还可以通过声明一个 RabbitListenerRetrySettingsCustomizer bean 来以编程方式自定义 RetryPolicyspring-doc.cadn.net.cn

默认情况下,如果禁用了重试且监听器抛出异常,则会无限期地重试投递。 你可以通过两种方式修改此行为:将 defaultRequeueRejected 属性设置为 false,以便不尝试重新投递;或者抛出一个 AmqpRejectAndDontRequeueException 以指示应拒绝该消息。 后者是在启用重试并达到最大投递尝试次数时使用的机制。