参考指南
本指南介绍了RabbitMQ实现的Spring Cloud Stream Binder。 其中包含其设计、使用和配置选项的信息,以及 Stream Cloud Stream 概念如何映射到 RabbitMQ 的特定构造中。
1. 用途
要使用 RabbitMQ 绑定器,您可以使用以下 Maven 坐标将其添加到您的 Spring Cloud Stream 应用中:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
或者,您也可以使用Spring Cloud Stream RabbitMQ Starter,具体如下:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
2. RabbitMQ Binder 概述
以下简化图展示了RabbitMQ绑定器的作方式:
默认情况下,RabbitMQ Binder 的实现会将每个目的地映射到话题交流.
对于每个消费者群体,一个队列与 绑定话题交流.
每个消费者实例都有对应的RabbitMQ消费者其组的实例队列.
对于分区生产者和消费者,队列后缀为分区索引,并使用分区索引作为路由键。
对于匿名消费者(那些没有群属性),使用自动删除队列(带有随机唯一名称)。
通过使用可选的autoBindDlq选项中,你可以配置绑定器创建和配置死号队列(DLQ)(以及死号交换)DLX以及路由基础设施)。
默认情况下,死字母队列包含目的名称,并附加.dlq.
如果启用了重试 (最大尝试次数> 1),失败消息在重试用尽后传递给DLQ。
如果重试被禁用 (最大尝试次数 = 1),你应该设置requeueRejected自false(默认)这样失败的消息会被路由到DLQ,而不是重新排队。
另外转载ToDlq导致绑定器向DLQ发布失败消息(而非拒绝)。
该功能允许额外信息(例如栈迹在x-exception-栈追踪在邮件中添加首部。
参见框架MaxHeadroom(框架最大头量)属性关于截断栈迹的信息。
这个选项不需要启用重试。
你只需尝试一次,就能重新发布失败消息。
从1.2版本开始,你可以配置重发布消息的传递方式。
参见republishDeliveryMode属性.
如果流监听器抛出ImmediateAcknowledgeAmqpException,DLQ被绕过,消息被直接丢弃。
从2.1版本开始,无论 设置如何,这都成立转载ToDlq;之前只有在转载ToDlq是false.
设置requeueRejected自true(其中republishToDlq=false) 会导致消息不断重新排队和重新投递,除非失败原因是暂时性的,否则这很可能不是你想要的。
一般来说,你应该在绑定器里通过设置来启用重试最大尝试次数通过设置,大于一或转载ToDlq自true. |
有关这些属性的更多信息,请参见RabbitMQ活板属性。
该框架没有提供任何标准机制来消耗死信(或将其重新路由回主队列)。 部分选项在死字队列处理中有所描述。
当 Spring Cloud Stream 应用中使用多个 RabbitMQ 绑定器时,关闭“RabbitAutoConfiguration”以避免出现相同的配置非常重要兔子自动配置被贴在两本活页夹上。
你可以通过以下方式排除该类@SpringBootApplication注解。 |
从2.0版本开始,兔子消息频道绑定器设置RabbitTemplate.userPublisherConnection属性到true这样非事务生产者就能避免消费者的死锁,比如缓存连接因代理的内存警报而被阻断时。
目前,多重消费者(单个消费者监听多个队列)仅支持消息驱动消费者;被轮询的消费者只能从单个队列中获取消息。 |
3. 配置选项
本节包含针对RabbitMQ绑定器和绑定通道的具体设置。
有关一般的绑定配置选项和属性,请参见 Spring Cloud Stream 核心文档。
3.1. RabbitMQ Binder 属性
默认情况下,RabbitMQ 绑定器使用 Spring Boot 的连接工厂.
它支持RabbitMQ的所有Spring Boot配置选项。
(参考,请参见 Spring Boot 文档。)
RabbitMQ 配置选项使用Spring.rabbitmq前缀。
除了 Spring Boot 选项外,RabbitMQ 绑定器还支持以下属性:
- spring.cloud.stream.rabbit.binder.adminAddresses
-
一个逗号分隔的RabbitMQ管理插件URL列表。 仅用于
节点包含多个条目。 该列表中的每个条目都必须对应一个spring.rabbitmq.addresses. 只有在使用 RabbitMQ 集群并希望从托管队列节点消费时才需要。 更多信息请参见队列亲和力和本地化队列连接工厂。默认:空。
- Spring.cloud.stream.rabbit.binder.nodes
-
RabbitMQ 节点名称的逗号分隔列表。 当有多个条目时,用于定位队列所在的服务器地址。 该列表中的每个条目都必须对应一个
spring.rabbitmq.addresses. 只有在使用 RabbitMQ 集群并希望从托管队列节点消费时才需要。 更多信息请参见队列亲和力和本地化队列连接工厂。默认:空。
- spring.cloud.stream.rabbit.binder.compressionLevel
-
压缩绑定的压缩水平。 看
java.util.zip.德弗莱特.违约:
1(BEST_LEVEL)。 - spring.cloud.stream.binder.connection-name-prefix
-
连接名称前缀用于命名该绑订器创建的连接。 名称为前缀,后面跟着
#n哪里n每次开启新连接时,数据都会递增。默认:无(Spring AMQP默认)。
3.2. RabbitMQ 消费者属性
以下房产仅供Rabbit用户使用,且必须在前缀前加spring.cloud.stream.rabbit.bindings.<channelName>.consumer..
然而,如果大多数绑定需要应用相同的性质集合,则
避免重复,Spring Cloud Stream支持为所有频道设置数值,
格式为spring.cloud.stream.rabbit.default.<property>=<value>.
另外,请记住,绑定特定属性会覆盖默认情况下的对应属性。
- 承认模式
-
确认模式。
违约:
自动. - 匿名组前缀
-
当绑定没有
群属性是一个匿名的自动删除队列,绑定到目的地交换。 此类队列的默认命名算法会生成一个名为匿名.<base64的UUID表示>. 将此属性设置为将前缀改为非默认的。违约:
匿名。. - autoBindDlq
-
是否要自动声明DLQ并将其绑定到绑定器DLX上。
违约:
false. - 绑定路由键
-
用来绑定队列到交换机的路由密钥(如果
绑定队列是true). 可以有多个键——参见bindingRoutingKeyDelimiter. 对于分区的目的地,-<实例索引>附加在每个键上。违约:。
# - bindingRoutingKeyDelimiter
-
当这不是空时,'bindingRoutingKey' 被视为由该值分隔的键列表;通常会使用逗号。
违约:
零. - 绑定队列
-
是否声明队列并将其绑定到目的交换机。 设置为
false如果你已经搭建了自己的基础设施,并且之前已经创建并绑定了队列。违约:
true. - consumerTagPrefix
-
用于创建消费者标签;将附于
#n哪里n每个消费者创建的增量。 例:${spring.application.name}-${spring.cloud.stream.bindings.input.group}-${spring.cloud.stream.instance-index}.默认:无——经纪人会生成随机的消费者标签。
- containerType
-
选择要使用的监听器容器类型。 更多信息请参见春季AMQP文档中的“选择容器”。
违约:
简单 - deadLetterQueueName
-
DLQ的名称
违约:
prefix+destination.dlq - 死信交换
-
一个DLX来分配到队列。 只有在
autoBindDlq是true.默认:'前缀+DLX'
- 死信交换类型
-
分配到队列的DLX类型。 只有在
autoBindDlq是true.默认:'直接'
- deadLetterRoutingKey
-
一个死信路由密钥来分配到队列。 只有在
autoBindDlq是true.违约:
目的地 - declareDlx
-
是否申报目的地的空信交换。 只有在
autoBindDlq是true. 设置为false如果你有预配置的DLX。违约:
true. - 声明交换
-
是否要宣布目的地的交换。
违约:
true. - 延迟交换
-
是否应声明该交换为
延迟消息交换. 需要经纪商上的延迟消息交换插件。 这X-延迟型参数设置为exchange类型.违约:
false. - dlqBindingArguments
-
在将DLQ绑定于死信交换时适用的论点;与
头死信交换类型指定匹配的头部。 例如…dlqBindingArguments.x-match=any,…dlqBindingArguments.someHeader=someValue.默认:空
- dlqDeadLetterExchange(死信交换)
-
如果声明了DLQ,则分配一个DLX到该队列。
违约:
没有 - dlqDeadLetterRoutingKey
-
如果声明了DLQ,则会分配一个死字母路由密钥到该队列。
违约:
没有 - dlq过期
-
未使用的死信队列被删除前多久(毫秒级)。
违约:
无过期 - dlq懒惰
-
声明死信队列,使用
x-queue-mode=lazy论点。 参见“懒散排队”。 考虑用策略代替这个设置,因为使用策略可以在不删除队列的情况下更改设置。违约:
false. - dlq最大长度
-
死信队列中最大消息数量。
违约:
没有限制 - dlqMaxLengthBytes
-
所有消息中死字母队列中总字节数的最大。
违约:
没有限制 - DLQMax优先级
-
死信队列中消息的最大优先级(0-255)。
违约:
没有 - dlq溢出行为
-
应采取的行动
dlq最大长度或dlqMaxLengthBytes超过;现在下垂头或拒绝-发布但请参考RabbitMQ文档。违约:
没有 - dlqQuorum.deliveryLimit(限量)
-
什么时候
quorum.enabled=true设置投递限制,超过此后消息被丢弃或写入死字母。违约:无——将适用经纪人违约。
- dlqQuorum.enabled
-
如果为真,创建法定人数死信队列,而不是传统的队列。
默认:false
- dlqQuorum.initialQuorumSize
-
什么时候
quorum.enabled=true,设定初始的法定人数。违约:无——将适用经纪人违约。
- dlq单一活跃消费者
-
设置为 true,以设置
x-单一-主动-消费者队列属性转为true。违约:
false - dlqTtl
-
默认的生存时间在声明时应用到死字母队列(以毫秒为单位)。
违约:
没有限制 - 耐用订阅
-
订阅是否应该是持久的。 只有当
群也已设定。违约:
true. - 交换自动删除
-
如果
声明交换是否应自动删除交换(即在最后一个队列被移除后删除)。违约:
true. - 交换耐用
-
如果
声明交换是否应当保持持久性(即经纪商重启后存活)为真。违约:
true. - exchange类型
-
交换类型:
直接,扇形,头或主题对于非分区目的和直接,头部或主题用于分区目的地。违约:
主题. - 独家
-
是否要创建一个专属消费者。 当
true. 通常用于需要严格命令但能让热备份实例在故障后接管。 看恢复间隔,控制备用实例尝试消费的频率。 考虑使用单一活跃消费者而是在使用 RabbitMQ 3.8 或更高版本时。违约:
false. - 到期
-
未使用的队列被删除前多久(以毫秒计)。
违约:
无过期 - failedDeclarationRetryInterval
-
如果队列中缺少,则使用尝试消耗之间的间隔(以毫秒计)。
默认值:5000
- 框架MaxHeadroom(框架最大头量)
-
在将栈迹添加到DLQ消息头部时,应保留给其他头部的字节数。 所有头部都必须符合
frame_max经纪人已配置尺寸。 栈线可以很大;如果 的大小加上该性质frame_max然后栈跟踪会被截断。 将会记录WARN日志;考虑增加frame_max或者通过捕捉异常并抛弃一个栈轨迹较小的异常来减少栈跟踪。默认值:20000
- 头部模式
-
从入站消息映射的头部模式。
默认:(所有头部)。
['*'] - 懒惰
-
声明队列
x-queue-mode=lazy论点。 参见“懒散排队”。 考虑用策略代替这个设置,因为使用策略可以在不删除队列的情况下更改设置。违约:
false. - max并发
-
最大数量的消费者。 当
containerType是直接.违约:
1. - 最大长度
-
队列中消息的最大数量。
违约:
没有限制 - maxLengthBytes
-
所有消息中队列中总字节的最大数。
违约:
没有限制 - maxPriority
-
队列中消息的最大优先级(0-255)。
违约:
没有 - 缺失队列致命
-
当找不到队列时,是否应将该条件视为致命并停止监听器容器。 默认
false这样容器就会不断尝试从队列中获取数据——例如,当使用集群且托管非 HA 队列的节点宕机时。违约:
false - 溢出行为
-
应采取的行动
最大长度或maxLengthBytes超过;现在下垂头或拒绝-发布但请参考RabbitMQ文档。违约:
没有 - 预取
-
预取计数。
违约:
1. - 前缀
-
在名称后加上一个前缀
目的地还有排队。默认:“。
- queueBindingArguments
-
参数在绑定队列到交换时应用;与
头exchange类型指定匹配的头部。 例如…queueBindingArguments.x-match=any,…queueBindingArguments.someHeader=someValue.默认:空
- queueDeclarationRetries
-
如果队列中缺少,需要重新尝试消耗的次数。 仅在当时相关
缺失队列致命是true. 否则,容器会无限期地重试。 当containerType是直接.违约:
3 - 仅 queueNameGroupOnly
-
当为真时,从一个队列中取出的 ,其名称等于
群. 否则,队列名称为目的地.group. 例如,当使用 Spring Cloud Stream 从现有的 RabbitMQ 队列中进行消费时,这非常有用。默认:false。
- quorum.deliveryLimit(法定人数)限制
-
什么时候
quorum.enabled=true设置投递限制,超过此后消息被丢弃或写入死字母。违约:无——将适用经纪人违约。
- quorum.enabled
-
当成立时,创建法定人数队列,而非经典队列。
默认:false
- quorum.initialQuorumSize
-
什么时候
quorum.enabled=true,设定初始的法定人数。违约:无——将适用经纪人违约。
- 恢复间隔
-
连接恢复尝试之间的间隔,以毫秒计。
违约:
5000. - requeueRejected
-
当重试被禁用时,是否应重新排队传输失败
转载ToDlq是false.违约:
false.
- republishDeliveryMode
-
什么时候
转载ToDlq是true,指定重发布消息的传递方式。违约:
DeliveryMode.PERSISTENT - 转载ToDlq
-
默认情况下,重试用尽后失败的消息会被拒绝。 如果配置了死号队列(DLQ),RabbitMQ会将未更改的失败消息路由到DLQ。 如果设置为
true,绑定器会通过额外的头部(包括异常消息和最终失败原因的栈追踪)重新发布到DLQ。 另见 frameMaxHeadroom 属性。默认:false
- 单一活跃消费者
-
设置为 true,以设置
x-单一-主动-消费者队列属性转为true。违约:
false - 交易
-
是否使用交易通道。
违约:
false. - TTL
-
默认的存活时间在声明时应用到队列(以毫秒为单位)。
违约:
没有限制 - txSize
-
每一次攻击之间的投球次数。 当
containerType是直接.违约:
1.
3.3. 高级监听器容器配置
要设置未被暴露为绑定或绑定属性的监听器容器属性,只需添加一个类型的ListenerContainerCustomizer切换到应用上下文。
绑定器和绑定属性会被设置好,然后调用自定义器。
定制器(configure()方法)以队列名称和消费者组作为参数提供。
3.4. 高级队列/交换/绑定配置
RabbitMQ 团队不时添加新功能,这些功能通过在声明队列时设置参数来启用。
通常,这些功能可以通过添加适当的属性在绑定器中启用,但当前版本可能无法立即提供。
从3.0.1版本开始,你可以添加DeclarableCustomizer对应用上下文进行 BEAN(s)修改可宣告 (队列,交换或捆绑)就在宣告执行前。
这样你就可以添加那些目前未被活页夹直接支持的论据。
3.5. 接收批处理消息
通常,如果生产者有约束batch-enabled=true(参见兔子生产者属性),或者消息由批量处理兔子模板批次中的元素以单独调用的形式返回到监听器方法。
从3.0版本开始,任何此类批次都可以作为名单<?>如果 到 监听者方法spring.cloud.stream.bindings.<name>.consumer.batch-mode设置为true.
3.6. 兔子制作人
以下属性仅供Rabbit生产者使用,且必须以spring.cloud.stream.rabbit.bindings.<channelName>.producer..
然而,如果大多数绑定需要应用相同的性质集合,则
避免重复,Spring Cloud Stream支持为所有频道设置数值,
格式为spring.cloud.stream.rabbit.default.<property>=<value>.
另外,请记住,绑定特定属性会覆盖默认情况下的对应属性。
- autoBindDlq
-
是否要自动声明DLQ并将其绑定到绑定器DLX上。
违约:
false. - batchingEnabled
-
是否启用生产商的消息批处理。 消息根据以下属性(在本列表中接下来的三条目中描述)被批量整理成一条消息:“batchSize”,
batchBufferLimit(批处理缓冲限制)和batchTimeout. 更多信息请参见批处理。 另见“接收批量消息”。违约:
false. - 批量大小
-
启用批处理时需要缓冲的消息数量。
违约:
100. - batchBufferLimit(批处理缓冲限制)
-
启用批处理时的最大缓冲区大小。
违约:
10000. - batchTimeout
-
开启批处理时的批处理超时。
违约:
5000. - 绑定路由键
-
用来绑定队列到交换机的路由密钥(如果
绑定队列是true). 可以有多个键——参见bindingRoutingKeyDelimiter. 对于分区的目的地,-n附加在每个键上。 仅适用于必要组仅提供给这些群体。违约:。
# - bindingRoutingKeyDelimiter
-
当这不是空时,'bindingRoutingKey' 被视为由该值分隔的键列表;通常会使用逗号。 仅适用于
必要组仅提供给这些群体。违约:
零. - 绑定队列
-
是否声明队列并将其绑定到目的交换机。 设置为
false如果你已经搭建了自己的基础设施,并且之前已经创建并绑定了队列。 仅适用于必要组仅提供给这些群体。违约:
true. - 压缩
-
发送数据时是否应压缩。
违约:
false. - 确认Ack频道
-
什么时候
errorChannelEnabled是真的,这是一个发送正面送达确认(即出版商确认)的渠道。 如果该信道不存在,则直达频道注册时使用该名称。 连接工厂必须配置为启用发布者确认。违约:
零通道(确认被丢弃)。 - deadLetterQueueName
-
DLQ的名称 仅适用于
必要组仅提供给这些群体。违约:
prefix+destination.dlq - 死信交换
-
一个DLX来分配到队列。 仅在当时相关
autoBindDlq是true. 仅适用于必要组仅提供给这些群体。默认:'前缀+DLX'
- 死信交换类型
-
分配到队列的DLX类型。 只有在
autoBindDlq是true. 仅适用于必要组仅提供给这些群体。默认:'直接'
- deadLetterRoutingKey
-
一个死信路由密钥来分配到队列。 仅在当时相关
autoBindDlq是true. 仅适用于必要组仅提供给这些群体。违约:
目的地 - declareDlx
-
是否申报目的地的空信交换。 只有在
autoBindDlq是true. 设置为false如果你有预配置的DLX。 仅适用于必要组仅提供给这些群体。违约:
true. - 声明交换
-
是否要宣布目的地的交换。
违约:
true. - 延迟表达式
-
一个 SpEL 表达式用于评估应用到消息上的延迟(
X延迟页首)。 如果交换不是延迟消息交换,则无效。默认:否
X延迟标头已设置。 - 延迟交换
-
是否应声明该交换为
延迟消息交换. 需要经纪商上的延迟消息交换插件。 这X-延迟型参数设置为exchange类型.违约:
false. - 传递模式
-
传递方式。
违约:
持续. - dlqBindingArguments
-
在将DLQ绑定于死信交换时适用的论点;与
头死信交换类型指定匹配的头部。 例如…dlqBindingArguments.x-match=any,…dlqBindingArguments.someHeader=someValue. 仅适用于必要组仅提供给这些群体。默认:空
- dlqDeadLetterExchange(死信交换)
-
当 DLQ 声明时,会为该队列分配一个 DLX。 仅适用于
必要组仅提供给这些群体。违约:
没有 - dlqDeadLetterRoutingKey
-
当宣告 DLQ 时,会分配一个死符路由密钥到该队列。 仅适用于
必要组仅提供给这些群体。违约:
没有 - dlq过期
-
多久(毫秒)会删除未使用的死字母队列。 仅适用于
必要组仅提供给这些群体。违约:
无过期 - dlq懒惰
-
声明死信队列,使用
x-queue-mode=lazy论点。 参见“懒散排队”。 考虑用策略代替这个设置,因为使用策略可以在不删除队列的情况下更改设置。 仅适用于必要组仅提供给这些群体。 - dlq最大长度
-
死信队列中最大消息数量。 仅适用于
必要组仅提供给这些群体。违约:
没有限制 - dlqMaxLengthBytes
-
所有消息中死字母队列中总字节数的最大。 仅适用于
必要组仅提供给这些群体。违约:
没有限制 - DLQMax优先级
-
死信队列中消息的最大优先级(0-255) 仅适用于
必要组仅提供给这些群体。违约:
没有 - dlqQuorum.deliveryLimit(限量)
-
什么时候
quorum.enabled=true设置投递限制,超过此后消息被丢弃或写入死字母。 仅适用于必要组仅提供给这些群体。违约:无——将适用经纪人违约。
- dlqQuorum.enabled
-
如果为真,创建法定人数死信队列,而不是传统的队列。 仅适用于
必要组仅提供给这些群体。默认:false
- dlqQuorum.initialQuorumSize
-
什么时候
quorum.enabled=true,设定初始的法定人数。 仅适用于必要组仅提供给这些群体。违约:无——将适用经纪人违约。
- dlq单一活跃消费者
-
设置为 true,以设置
x-单一-主动-消费者队列属性转为true。 仅适用于必要组仅提供给这些群体。违约:
false - dlqTtl
-
默认的存活时间(以毫秒计)在声明时应用到死符队列。 仅适用于
必要组仅提供给这些群体。违约:
没有限制 - 交换自动删除
-
如果
声明交换是true,是否应自动删除交换(在最后一个队列被移除后自动删除)。违约:
true. - 交换耐用
-
如果
声明交换是true,交易所是否应具备持久性(经纪商重启后存活)。违约:
true. - exchange类型
-
交换类型:
直接,扇形,头或主题对于非分区目的和直接,头或主题用于分区目的地。违约:
主题. - 到期
-
未使用的队列被删除前需要多长时间(毫秒)。 仅适用于
必要组仅提供给这些群体。违约:
无过期 - 头部模式
-
将头映射到外发消息的模式。
默认:(所有头部)。
['*'] - 懒惰
-
声明队列
x-queue-mode=lazy论点。 参见“懒散排队”。 考虑用策略代替这个设置,因为使用策略可以在不删除队列的情况下更改设置。 仅适用于必要组仅提供给这些群体。违约:
false. - 最大长度
-
队列中最大消息数量。 仅适用于
必要组仅提供给这些群体。违约:
没有限制 - maxLengthBytes
-
所有消息队列中总字节的最大数值。 仅适用于
必要组仅提供给这些群体。违约:
没有限制 - maxPriority
-
队列中消息的最大优先级(0-255)。 仅适用于
必要组仅提供给这些群体。违约:
没有 - 前缀
-
在名称后加上一个前缀
目的地交换。默认:“。
- queueBindingArguments
-
参数在绑定队列到交换时应用;与
头exchange类型指定匹配的头部。 例如…queueBindingArguments.x-match=any,…queueBindingArguments.someHeader=someValue. 仅适用于必要组仅提供给这些群体。默认:空
- 仅 queueNameGroupOnly
-
什么时候
true,从一个名为 的队列中取用群. 否则,队列名称为目的地.group. 例如,当使用 Spring Cloud Stream 从现有的 RabbitMQ 队列中进行消费时,这非常有用。 仅适用于必要组仅提供给这些群体。默认:false。
- quorum.deliveryLimit(法定人数)限制
-
什么时候
quorum.enabled=true设置投递限制,超过此后消息被丢弃或写入死字母。 仅适用于必要组仅提供给这些群体。违约:无——将适用经纪人违约。
- quorum.enabled
-
当成立时,创建法定人数队列,而非经典队列。 仅适用于
必要组仅提供给这些群体。默认:false
- quorum.initialQuorumSize
-
什么时候
quorum.enabled=true,设定初始的法定人数。 仅适用于必要组仅提供给这些群体。违约:无——将适用经纪人违约。
- 路由键表达式
-
一个用于确定发布消息时使用的路由密钥的SpEL表达式。 对于固定路由密钥,可以使用字面表达式,例如:
routingKeyExpression='my.routingKey'在属性文件中或routingKeyExpression: ''my.routingKey'''在一个YAML文件中。违约:
目的地或目的<分区>用于分区目的地。 - 单一活跃消费者
-
设置为 true,以设置
x-单一-主动-消费者队列属性转为true。 仅适用于必要组仅提供给这些群体。违约:
false - 交易
-
是否使用交易通道。
违约:
false. - TTL
-
默认的存活时间(毫秒)在宣告时应用到队列中。 仅适用于
必要组仅提供给这些群体。违约:
没有限制
| 在RabbitMQ的情况下,内容类型头部可以由外部应用程序设置。 Spring Cloud Stream 作为扩展内部协议的一部分支持它们,适用于任何类型的传输——包括如 Kafka(0.11 之前)原生不支持头部的传输。 |
4. 使用现有队列/交换
默认情况下,绑定器会自动设置主题交换,名称来源于目标绑定属性的值<前缀><目的地>.
如果未提供,目的地默认为绑定名称。
绑定消费者时,队列会自动以该名称配置<prefix><destination>.<group>(如果 a群绑定属性被指定),或者在没有 时使用匿名自动删除队列群.
对于非分区绑定,队列将被绑定到交换局,使用“匹配全”通配符路由密钥()或#<destination>-<instanceIndex>对于分区绑定。
前缀为空字符串默认。
如果输出绑定被指定为必要组,每个组都会预约一个队列/绑定。
有许多兔子专用的绑定属性允许你修改这种默认行为。
如果你有现有的交易所/队列想使用,可以完全关闭自动配置,前提是交换所已被命名我的交换队列被命名为我的队列:
-
spring.cloud.stream.bindings.<binding name>.destination=myExhange -
spring.cloud.stream.bindings.<binding name>.group=myQueue -
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindQueue=false -
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.declareExchange=false -
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.queueNameGroupOnly=true
如果你想让绑定器来配置队列/交换,但你想用这里讨论的默认值以外的方式来实现,请使用以下属性。 更多信息请参阅上方的物业文件。
-
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindingRoutingKey=myRoutingKey -
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.exchangeType=<type> -
spring.cloud.stream.rabbit.bindings.<binding name>.producer.routingKeyExpression='myRoutingKey'
在声明死单字交换/队列时,也有类似的属性,当autoBindDlq是true.
5. 用RabbitMQ文件夹重试
当绑定器内启用重试时,监听器容器线程会暂停,持续设置后退时间。 当需要对单个消费者进行严格订购时,这一点可能很重要。然而,在其他使用场景中,它会阻止该线程中处理其他消息。 使用活页夹重试的另一种方法是设置死符队列(DLQ)中的死字母,同时在DLQ本身设置死符配置。 有关此处讨论的属性的更多信息,请参见“RabbitMQ Binder Properties”。 您可以使用以下示例配置来启用此功能:
-
设置
autoBindDlq自true. 活页夹会创建一个DLQ。 可选地,你可以在deadLetterQueueName. -
设置
dlqTtl回到你想在两次重新送货之间等待的退货时间。 -
设置
dlqDeadLetterExchange(死信交换)回到默认交换。 DLQ的过期消息会被路由到原始队列,因为默认情况下deadLetterRoutingKey是队列名称(目的地.group). 设置为默认交换是通过将属性设置为无值来实现的,如下一个示例所示。
要强制消息为死字母,可以抛出AmqpRejectAndDontRequeueException或集合requeueRejected自true(默认)并抛出任意例外。
循环是无尽的,这对瞬态问题来说没问题,但尝试几次后你可能想放弃。
幸运的是,RabbitMQ提供了X-死亡头部,可以让你判断已经发生了多少个周期。
放弃后要确认消息,可以抛出ImmediateAcknowledgeAmqpException.
5.1. 将一切整合起来
以下配置创建交换我的目的地带队列myDestination.consumerGroup绑定到带有通配符路由密钥的主题交换:#
---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---
这种配置创建了一个绑定到直接交换的DLQ(DLX) 的路由键为myDestination.consumerGroup.
当消息被拒绝时,它们会被路由到DLQ。
5秒后,消息失效,并通过使用队列名称作为路由密钥路由到原始队列,如下示例所示:
@SpringBootApplication
@EnableBinding(Sink.class)
public class XDeathApplication {
public static void main(String[] args) {
SpringApplication.run(XDeathApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in, @Header(name = "x-death", required = false) Map<?,?> death) {
if (death != null && death.get("count").equals(3L)) {
// giving up - don't send to DLX
throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
}
throw new AmqpRejectAndDontRequeueException("failed");
}
}
注意计数性质X-死亡头是长.
6. 错误信道
从版本1.3开始,绑定器无条件向每个消费者目的地的错误通道发送异常,并且可以配置为向错误通道发送异步生产者发送失败。 更多信息请参见“[spring-cloud-stream-overview-error-handling]”。
RabbitMQ 有两种发送失败类型:
-
回复消息,
-
负面致谢,出版商确认。
后者较为罕见。 根据RabbitMQ文档,“[nack]只有在负责队列的Erlang进程发生内部错误时才会被交付。”
除了启用生产者错误通道(如“[spring-cloud-stream-overview-error-handling]”中描述的),RabbitMQ 绑定器仅在连接工厂配置适当的情况下向通道发送消息,具体如下:
-
ccf.set出版者确认(真实); -
ccf.set出版商返回(真实);
在使用 Spring Boot 配置连接工厂时,请设置以下属性:
-
Spring。rabbitmq.publisher-confirms -
Spring.兔子问答。出版者-归来
有效载荷错误消息对于返回消息是返回AmqpMessageException具有以下性质:
-
失败消息:春季信息留言<?>但那份文件未能发送。 -
amqpMessage:原始的春季AMQP消息. -
回复代码:一个整数值表示失败原因(例如,312 - 无路由)。 -
回复正文:一个表示失败原因的文本值(例如,NO_ROUTE). -
交换:该信息发布的交流。 -
路由键:消息发布时使用的路由密钥。
对于负向确认,有效载荷为NackedAmqpMessageException具有以下性质:
-
失败消息:春季信息留言<?>但那份文件未能发送。 -
纳克理性:一个原因(如果有的话——您可能需要查看经纪人日志以获取更多信息)。
这些异常(例如发送到死信队列)没有自动处理。 你可以用自己的 Spring 集成流程来调用这些例外。
7. 死符队列处理
由于无法预见用户如何处理死字母消息,该框架没有提供任何标准处理机制。
如果死字母出现的原因暂时性,您可能希望将消息路由回原始队列。
但如果问题是永久性的,可能会导致无限循环。
以下 Spring Boot 应用展示了如何将这些消息路由回原始队列的示例,但在尝试三次后将消息移至第三个“停车场”队列。
第二个例子使用RabbitMQ延迟消息交换,为重新排队的消息引入延迟。
在这个例子中,每次尝试延迟都会增加。
这些例子使用@RabbitListener接收来自DLQ的信息。
你也可以用RabbitTemplate.receive()批量处理。
示例假设原始目的地为SO8400英寸而消费者组为SO8400.
7.1. 非分区目的地
前两个例子适用于目的节点未分割时:
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String DELAY_EXCHANGE = "dlqReRouter";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
headers.put("x-delay", 5000 * retriesHeader);
this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public DirectExchange delayExchange() {
DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
exchange.setDelayed(true);
return exchange;
}
@Bean
public Binding bindOriginalToDelay() {
return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
7.2. 分区目的地
对于分区目标,所有分区都有一个DLQ。我们从报头确定原始队列。
7.2.1.republishToDlq=false
什么时候转载ToDlq是falseRabbitMQ 通过X-死亡包含原始目的地信息的头部,如下示例所示:
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_DEATH_HEADER = "x-death";
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@SuppressWarnings("unchecked")
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
String exchange = (String) xDeath.get(0).get("exchange");
List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
7.2.2.republishToDlq=true
什么时候转载ToDlq是true,重发布恢复器将原始交换和路由密钥添加到头部,如下示例所示:
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;
private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
8. 用RabbitMQ绑定器进行分区
RabbitMQ原生不支持分区。
有时,将数据发送到特定分区是有利的——例如,当你想严格订购消息处理时,特定客户的所有消息都应发送到同一个分区。
这兔子消息频道绑定器通过为每个分区绑定一个队列到目的交换机,实现分区。
以下 Java 和 YAML 示例展示了如何配置生产者:
@SpringBootApplication
@EnableBinding(Source.class)
public class RabbitPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"abc1", "def1", "qux1",
"abc2", "def2", "qux2",
"abc3", "def3", "qux3",
"abc4", "def4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
.web(false)
.run(args);
}
@InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
public Message<?> generate() {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
}
}
spring:
cloud:
stream:
bindings:
output:
destination: partitioned.destination
producer:
partitioned: true
partition-key-expression: headers['partitionKey']
partition-count: 2
required-groups:
- myGroup
|
前述示例中的配置使用默认的分区( 这 |
以下配置提供了主题交换功能:
以下队列绑定到该交换:
以下绑定将队列关联到交换:
以下 Java 和 YAML 示例延续了之前的示例,展示了如何配置消费者:
@SpringBootApplication
@EnableBinding(Sink.class)
public class RabbitPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
.web(false)
.run(args);
}
@StreamListener(Sink.INPUT)
public void listen(@Payload String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
System.out.println(in + " received from queue " + queue);
}
}
spring:
cloud:
stream:
bindings:
input:
destination: partitioned.destination
group: myGroup
consumer:
partitioned: true
instance-index: 0
这兔子消息频道绑定器不支持动态缩放。
每个分区至少必须有一个消费者。
消费者的实例索引用于表示被消耗的分区。
像Cloud Foundry这样的平台只能有一个实例,且实例索引. |