该版本仍在开发中,尚未被视为稳定。对于最新的稳定版本,请使用 Spring Integration 7.0.0spring-doc.cadn.net.cn

JDBC消息库

Spring Integration 提供两种 JDBC 专用的消息存储实现。 这Jdbc消息存储适合用于聚合器和理赔检查模式。 这Jdbc频道消息存储实现为消息通道提供了更有针对性和可扩展性的实现。spring-doc.cadn.net.cn

注意你可以使用Jdbc消息存储支持消息通道,Jdbc频道消息存储为此目的进行了优化。spring-doc.cadn.net.cn

从5.0.11、5.1.2版本开始,索引Jdbc频道消息存储已经优化过。 如果你在这样的商店里有大型消息组,可能需要修改索引。 此外,的指标优先频道注释被剔除,因为除非你使用由JDBC支持的此类频道,否则不需要注释。
当使用OracleChannelMessageStoreQueryProvider,必须添加优先通道索引,因为它包含在查询中的提示中。

数据库初始化

在开始使用 JDBC 消息存储组件之前,你应该先配置一个包含相应对象的目标数据库。spring-doc.cadn.net.cn

Spring Integration 附带了一些示例脚本,可用于初始化数据库。 在Spring-integration-JDBCJAR文件中,你可以在org.springframework.integration.jdbc包。 它为多种常见数据库平台提供了示例创建脚本和示例丢弃脚本。 使用这些脚本的常见方式是在 Spring JDBC 数据源初始化工具中引用它们。 请注意,这些脚本作为示例和所需表和列名的规格提供。 你可能会发现需要为生产用途增强它们(例如添加索引声明)。spring-doc.cadn.net.cn

从6.2版本开始,Jdbc消息存储,Jdbc频道消息存储,Jdbc元数据存储默认锁定仓库实现SmartLifecycle并在各自的表中执行“SELECT COUNT”查询,开始()方法确保目标数据库中存在所需的表(根据提供的前缀)。 如果不存在所需的表,应用上下文将无法启动。 该检查可以通过以下方式关闭setCheckDatabaseOnStart(false).spring-doc.cadn.net.cn

通用JDBC消息库

JDBC模块实现了Spring集成消息商店(在理赔检查模式中很重要)MessageGroupStore(在有状态模式如聚合器中很重要)由数据库支持。 这两个接口均由Jdbc消息存储并且支持用XML配置存储实例,如下示例所示:spring-doc.cadn.net.cn

<int-jdbc:message-store id="messageStore" data-source="dataSource"/>

你可以指定一个Jdbc模板而不是数据来源.spring-doc.cadn.net.cn

以下示例展示了一些其他可选属性:spring-doc.cadn.net.cn

<int-jdbc:message-store id="messageStore" data-source="dataSource"
    lob-handler="lobHandler" table-prefix="MY_INT_"/>

在前面的例子中,我们指定了一个LobHandler用于处理消息作为大型对象(这对 Oracle 来说通常是必要的),以及存储生成的查询中表名称的前缀。 表名前缀默认为INT_.spring-doc.cadn.net.cn

备份消息通道

如果你打算用JDBC支持消息频道,我们建议使用Jdbc频道消息存储实现。 它仅与消息通道配合使用。spring-doc.cadn.net.cn

支持的数据库

Jdbc频道消息存储使用数据库特定的SQL查询从数据库中检索消息。 因此,你必须设置ChannelMessageStoreQueryProvider属性Jdbc频道消息存储. 这channelMessageStoreQueryProvider为你指定的特定数据库提供SQL查询。 Spring Integration 支持以下关系型数据库:spring-doc.cadn.net.cn

如果你的数据库没有列出,你可以实现ChannelMessageStoreQueryProvider可以进行界面并提供自定义查询。spring-doc.cadn.net.cn

4.0版本增加了MESSAGE_SEQUENCE确保即使消息存储在同一毫秒内,也能实现先进先出(FIFO)排队。spring-doc.cadn.net.cn

从6.2版本开始,ChannelMessageStoreQueryProvider揭露了isSingleStatementForPoll旗帜,其中PostgresChannelMessageStoreQueryProvider返回true其民调查询现基于单一删除。。。返回陈述。 这Jdbc频道消息存储isSingleStatementForPoll选项并跳过了单独的删除如果只支持单一民调陈述,则该陈述。spring-doc.cadn.net.cn

自定义消息插入

自5.0版本起,通过超载ChannelMessageStorePreparedStatementSetter类中,你可以在Jdbc频道消息存储. 你可以用它设置不同的列,或者更改表结构或序列化策略。 例如,默认序列化为字节[]你可以将其结构存储为 JSON 字符串。spring-doc.cadn.net.cn

以下示例使用默认实现集合值用于存储常见列,并覆盖将消息有效载荷存储为瓦尔查尔:spring-doc.cadn.net.cn

public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {

    @Override
    public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
        Object groupId, String region, 	boolean priorityEnabled) throws SQLException {
        // Populate common columns
        super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
        // Store message payload as varchar
        preparedStatement.setString(6, requestMessage.getPayload().toString());
    }
}

一般来说,我们不建议使用关系型数据库进行排队。 如果可能,建议使用JMS或AMQP支持的信道。 如需进一步参考,请参见以下资源:spring-doc.cadn.net.cn

如果你仍打算将数据库用作队列,可以考虑使用 PostgreSQL 及其通知机制,后面会详细说明spring-doc.cadn.net.cn

并行投票

在轮询消息通道时,你可以选择配置关联的轮询器其中任务执行者参考。spring-doc.cadn.net.cn

不过请记住,如果你使用JDBC支持的消息通道,并且计划轮询该通道及其后果的消息存储事务,包含多个线程,你应确保使用支持多版本并发控制(MVCC)的关系数据库。 否则,可能会有锁住的问题,使用多线程时性能可能无法如预期实现。 例如,阿帕奇德比在这方面存在问题。spring-doc.cadn.net.cn

为了实现更好的JDBC队列吞吐量,避免不同线程轮询相同数据时出现问题消息从队列中,设置使用IdCache的属性Jdbc频道消息存储true当使用不支持 MVCC 的数据库时, 以下示例展示了如何实现:spring-doc.cadn.net.cn

<bean id="queryProvider"
    class="o.s.i.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/>
</int:transaction-synchronization-factory>

<task:executor id="pool" pool-size="10"
    queue-capacity="10" rejection-policy="CALLER_RUNS" />

<bean id="store" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
    <property name="region" value="TX_TIMEOUT"/>
    <property name="usingIdCache" value="true"/>
</bean>

<int:channel id="inputChannel">
    <int:queue message-store="store"/>
</int:channel>

<int:bridge input-channel="inputChannel" output-channel="outputChannel">
    <int:poller fixed-delay="500" receive-timeout="500"
        max-messages-per-poll="1" task-executor="pool">
        <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"
        isolation="READ_COMMITTED" transaction-manager="transactionManager" />
    </int:poller>
</int:bridge>

<int:channel id="outputChannel" />

优先频道

从4.0版本开始,Jdbc频道消息存储实现优先能力通道消息存储并提供优先启用选择权,允许它作为消息存储参考文献优先队列实例。 为此,INT_CHANNEL_MESSAGE表有MESSAGE_PRIORITY列 以存储 的值优先权消息头。此外,还有一个新的MESSAGE_SEQUENCE列使我们能够实现稳健的先进先出(FIFO)轮询机制,即使在同一毫秒内存储多个消息以相同优先级存储。消息从数据库中轮询(选择)为MESSAGE_PRIORITY DESC 的顺序是最后、CREATED_DATE、MESSAGE_SEQUENCE.spring-doc.cadn.net.cn

我们不建议使用相同的产品Jdbc频道消息存储优先级和非优先级队列通道的 BEAN 是因为优先启用选项适用于整个存储,且队列通道不保留正确的先进先出队列语义。然而,情况相同INT_CHANNEL_MESSAGE桌面(甚至地区)可以同时用于这两种情况Jdbc频道消息存储类型。 要配置该场景,你可以从一个消息存储豆扩展到另一个,如下示例所示:
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

消息存储的分区

通常会使用Jdbc消息存储作为同一应用中一组应用或节点的全局存储。为了防止名称冲突并控制数据库元数据配置,消息存储允许表以两种方式分区。一种方式是通过更改前缀(如前所述)使用独立的表名。另一种方式是指定地区在单一表中分割数据的名称。第二种方法的一个重要用例是当消息商店管理支持Spring集成消息通道的持久队列。持久通道的消息数据在通道名称的存储中被锁定。因此,如果通道名称不全局唯一,通道可能会获取非其意图的数据。为避免此风险,您可以使用消息存储地区为了让不同物理通道的数据分开,这些通道具有相同的逻辑名称。spring-doc.cadn.net.cn

PostgreSQL:接收推送通知

PostgreSQL 提供了一个监听和通知框架,用于在数据库表作时接收推送通知。Spring Integration 利用该机制(从 6.0 版本开始)允许在新增消息到Jdbc频道消息存储. 使用此功能时,必须定义数据库触发器,该触发器可在schema-postgresql.sql该文件包含在 Spring Integration 的 JDBC 模块中。spring-doc.cadn.net.cn

推送通知通过PostgresChannelMessageTableSubscriber该类允许其用户在收到任何给定新消息时收到回调地区组ID. 即使消息被附加在不同的JVM上,但发送到同一数据库,这些通知也会被接收。 这PostgresSubscribableChannel(订阅频道)实现方式为PostgresChannelMessageTableSubscriber.Subscription合同中会对上述通知的反应从商店拉取消息PostgresChannelMessageTableSubscriber通知。spring-doc.cadn.net.cn

例如,推送通知某个群体可以按以下方式接收:spring-doc.cadn.net.cn

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
    JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
    messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
    return messageStore;
}

@Bean
public PostgresChannelMessageTableSubscriber subscriber(
      @Value("${spring.datasource.url}") String url,
      @Value("${spring.datasource.username}") String username,
      @Value("${spring.datasource.password}") String password) {
    return new PostgresChannelMessageTableSubscriber(() ->
        DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}

@Bean
public PostgresSubscribableChannel channel(
    PostgresChannelMessageTableSubscriber subscriber,
    JdbcChannelMessageStore messageStore) {
  return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}

从6.0.5版本开始,指定了PlatformTransactionManagerPostgresSubscribableChannel(订阅频道)会通知交易中的订阅者。订阅者中的异常会导致交易被回滚,消息会被重新放回消息存储。事务支持默认不会被激活。spring-doc.cadn.net.cn

从6.0.5版本开始,重试策略可以通过提供重试模板前往PostgresSubscribableChannel(订阅频道). 默认情况下,不进行重试。spring-doc.cadn.net.cn

任何活跃的PostgresChannelMessageTableSubscriber占据了独家的JDBC连接在其活跃生命周期内。因此,重要的是这种连接不应源自一个池化数据来源. 此类连接池通常期望已发出的连接在预设超时窗口内关闭。spring-doc.cadn.net.cn

为了实现独占连接的需求,也建议JVM只运行一个连接PostgresChannelMessageTableSubscriber该系统可用来注册任意数量的订阅。spring-doc.cadn.net.cn