该版本仍在开发中,尚未被视为稳定。对于最新的稳定版本,请使用 Spring Integration 7.0.0spring-doc.cadn.net.cn

AMQP支持的消息频道

有两种消息通道实现。 一个是点对点,另一个是发布-订阅。 这两个通道都为底层的配置属性提供了广泛的Amqp模板SimpleMessageListenerContainer(如本章前述关于通道适配器和网关的说明所示)。 然而,我们这里展示的例子配置非常有限。 探索XML模式以查看可用的属性。spring-doc.cadn.net.cn

点对点信道可能如下示例:spring-doc.cadn.net.cn

<int-amqp:channel id="p2pChannel"/>

在盖子下,前述例子导致队列si.p2pChannel该通道发送给队列(严格来说,是通过将与该交换机名称匹配的路由密钥发送到无名直接交换机队列). 该信道还注册了一个消费者队列. 如果你希望信道是“可轮询”的,而不是消息驱动的,请提供信息驱动值为false,如下示例所示:spring-doc.cadn.net.cn

<int-amqp:channel id="p2pPollableChannel"  message-driven="false"/>

一个发布订阅频道可能如下:spring-doc.cadn.net.cn

<int-amqp:publish-subscribe-channel id="pubSubChannel"/>

在被子里,前面的例子引发了一个名为si.fanout.pub子频道该频道将发送到该扇形交换。 该通道还声明了一个服务器名的专属、自动删除、非持久通道队列并在注册用户时将其绑定到扇出交换机队列接收信息。 发布-订阅-频道没有“可轮询”选项。 它必须是信息驱动的。spring-doc.cadn.net.cn

从4.1版本开始,AMQP支持的消息通道(配合通道交易) 支持模板通道交易分离事务配置用于摘要MessageListenerContainer和 对于兔子模板. 请注意,之前,通道交易true默认。 现在,默认情况下,它就是false对于摘要MessageListenerContainer.spring-doc.cadn.net.cn

在4.3版本之前,AMQP支持的频道仅支持消息序列 化有效载荷和头部。 整个消息被转换(序列化)并发送到 RabbitMQ。 现在,你可以设置提取物-有效载荷属性(或setExtractPayload()使用 Java 配置时)变成true. 当这面旗帜是true,消息有效载荷被转换并映射报头,方式类似于使用通道适配器。 这种安排使得AMQP支持的信道可以与不可序列化的有效载荷一起使用(可能还包括其他消息转换器,例如:Jackson2JsonMessageConverter). 有关默认映射头的更多信息,请参见AMQP消息头。 你可以通过提供使用以下条件的自定义映射器来修改映射出站头映射器入站头映射器属性。 你现在也可以指定默认投递模式当 没有 时,用于设置传递模式amqp_deliveryMode页眉。 默认情况下,Spring AMQP消息属性使用持续交付模式。spring-doc.cadn.net.cn

与其他持久性支持信道一样,AMQP支持信道旨在提供消息持久性,以避免消息丢失。 它们不打算将工作分发给其他同类应用。 为此,建议使用通道适配器。
从5.0版本开始,可轮询通道现在会屏蔽指定轮询线程收到超时(默认为1秒)。 之前,与其他人不同Pollable频道实现中,如果没有消息可用,线程会立即返回调度器,无论接收超时。 定型比用basicGet()为了获取消息(无超时),因为必须创建一个消费者来接收每条消息。 要恢复之前的行为,请设置轮询器的收到超时到0。

使用 Java 配置配置

以下示例展示了如何用 Java 配置配置通道:spring-doc.cadn.net.cn

@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("foo");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("bar");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("baz");
    factoryBean.setPubSub(false);
    return factoryBean;
}

使用 Java DSL 配置

以下示例展示了如何使用Java DSL配置信道:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.pollableChannel(connectionFactory)
                    .queueName("foo"))
            ...
            .get();
}

@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.channel(connectionFactory)
                    .queueName("bar"))
            ...
            .get();
}

@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.publishSubscribeChannel(connectionFactory)
                    .queueName("baz"))
            ...
            .get();
}