对于最新稳定版本,请使用 Spring Integration 7.0.0spring-doc.cadn.net.cn

关于非阻塞输入输出(NIO)

使用蔚来(参见用尼奥IP 配置属性中)避免了为每个套接字单独读取线程。 对于少量套接字,你很可能会发现不使用 NIO,同时采用异步切换(例如队列通道),性能与使用NIO相当甚至更好。spring-doc.cadn.net.cn

处理大量连接时,你应该考虑使用蔚来。 然而,使用NIO还有其他影响。 任务执行程序中的线程池在所有套接字间共享。 每个进来消息被组装并作为从该线程池中选定的线程上的独立工作单元发送到配置通道。 两个顺序消息到达同一套接字时,可能由不同的线程处理。 这意味着消息发送到信道的顺序是不确定的。 到达套接字的消息没有严格顺序。spring-doc.cadn.net.cn

对于某些应用来说,这并不是问题。 对另一些人来说,这反而是个问题。 如果你需要严格的排序,可以考虑设置用尼奥false并采用异步切换。spring-doc.cadn.net.cn

或者,你可以在入站端点下游插入重序器,将消息恢复到正确的顺序。 如果你设置了应用序列true在连接工厂,抵达TCP连接的消息有序列号关联Id头部已设置。 重序器利用这些头部将消息返回到正确的顺序。spring-doc.cadn.net.cn

从5.1.4版本开始,优先接受新连接,而非从现有连接读取。 除非你有非常高的新连接进站率,否则这通常影响不大。 如果你想恢复到之前赋予读取优先级的行为,可以设置多接受属性TcpNioServerConnectionFactoryfalse.

泳池规模

池大小属性不再使用。 此前,当未指定任务执行器时,它会指定默认线程池的大小。 它还被用来设置服务器套接字的连接积压。 第一个功能已不再需要(见下一段)。 第二个函数被储备属性。spring-doc.cadn.net.cn

此前,使用固定线程池任务执行器(为默认配置)配合NIO时,可能会出现死锁,导致处理停止。 问题发生在缓冲区满了,线程从套接字读取时试图向缓冲区添加更多数据,且缓冲区内没有线程可用来腾出空间。 这种情况只发生在非常小的水池中,但在极端条件下是有可能的。 自2.2版本起,有两项改进消除了这个问题。 首先,默认的任务执行器是一个缓存的线程池执行器。 其次,加入了死锁检测逻辑,如果线程发生饥饿,而不是死锁,而是抛出异常,从而释放死锁资源。spring-doc.cadn.net.cn

现在默认任务执行器是无界的,如果消息处理时间延长,可能会在高速率的消息中出现内存不足状态。 如果你的应用表现出这种行为,你应该使用池池大小合适的池任务执行器,但请见下一节

线程池任务执行器CALLER_RUNS政策

使用固定线池时,你应注意一些重要因素CallerRunsPolicy (CALLER_RUNS当使用<任务/>命名空间)且队列容量较小。spring-doc.cadn.net.cn

如果你没有使用固定线程池,以下内容不适用。spring-doc.cadn.net.cn

NIO连接分为三种不同的任务类型。 I/O 选择器处理在一个专用线程上执行(检测事件、接受新连接,并通过任务执行器将 I/O 读取作派遣到其他线程)。 当一个I/O读片线程(读取作被派遣到)读取数据时,它会交接给另一个线程来组装收到的消息。 大型消息可能需要多次读取才能完成。 这些“汇编”线程可以在等待数据时阻塞。 当发生新的读取事件时,读取器判断该套接字是否已有汇编器,如果没有,则运行新的套接字。 组装过程完成后,汇编线程返回到组。spring-doc.cadn.net.cn

当池子耗尽时,这可能导致僵局,CALLER_RUNS拒绝策略正在使用,任务队列已满。 当池空且队列无空间时,IO选择线程接收到OP_READ事件并通过执行程序发送读取。 队列已满,选择线程本身启动读取过程。 现在它检测到该套接字没有汇编器,在读取前,启动汇编器。 同样,队列已满,选择线程变成汇编器。 汇编器现在被阻塞,等待数据被读取,但从未发生过。 连接工厂现在死锁了,因为选择线程无法处理新事件。spring-doc.cadn.net.cn

为了避免这种死结,我们必须避免执行汇编任务的选择线程(或读取线程)。 我们希望为 IO 和组装作使用不同的池。spring-doc.cadn.net.cn

该框架提供了合成执行者,允许配置两个不同的执行程序:一个用于执行 IO作,另一个用于消息汇编。 在此环境中,IO 线程永远无法成为汇编线程,死锁也无法发生。spring-doc.cadn.net.cn

此外,任务执行器应配置为使用中止政策 (流产使用<任务>). 当输入输出任务无法完成时,会被推迟一段时间并不断重试,直到能够完成并分配汇编器。 默认延迟是100毫秒,但你可以通过设置readDelay连接工厂上的性质(读取延迟配置XML命名空间时)。spring-doc.cadn.net.cn

以下三个示例展示了如何配置复合执行器:spring-doc.cadn.net.cn

@Bean
private CompositeExecutor compositeExecutor() {
    ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
    ioExec.setCorePoolSize(4);
    ioExec.setMaxPoolSize(10);
    ioExec.setQueueCapacity(0);
    ioExec.setThreadNamePrefix("io-");
    ioExec.setRejectedExecutionHandler(new AbortPolicy());
    ioExec.initialize();
    ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
    assemblerExec.setCorePoolSize(4);
    assemblerExec.setMaxPoolSize(10);
    assemblerExec.setQueueCapacity(0);
    assemblerExec.setThreadNamePrefix("assembler-");
    assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
    assemblerExec.initialize();
    return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
    <constructor-arg ref="io"/>
    <constructor-arg ref="assembler"/>
</bean>

<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
    <constructor-arg>
        <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="threadNamePrefix" value="io-" />
            <property name="corePoolSize" value="4" />
            <property name="maxPoolSize" value="8" />
            <property name="queueCapacity" value="0" />
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
            </property>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="threadNamePrefix" value="assembler-" />
            <property name="corePoolSize" value="4" />
            <property name="maxPoolSize" value="10" />
            <property name="queueCapacity" value="0" />
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
            </property>
        </bean>
    </constructor-arg>
</bean>