该版本仍在开发中,尚未被视为稳定。对于最新的稳定版本,请使用 Spring Integration 7.0.0spring-doc.cadn.net.cn

SFTP 入站通道适配器

SFTP 入站通道适配器是一个特殊的监听器,连接服务器并监听远程目录事件(如新文件创建),随后发起文件传输。 以下示例展示了如何配置SFTP入站通道适配器:spring-doc.cadn.net.cn

<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
              session-factory="sftpSessionFactory"
            channel="requestChannel"
            filename-pattern="*.txt"
            remote-directory="/foo/bar"
            preserve-timestamp="true"
            local-directory="file:target/foo"
            auto-create-local-directory="true"
            local-filename-generator-expression="#this.toUpperCase() + '.a'"
            scanner="myDirScanner"
            local-filter="myFilter"
            temporary-file-suffix=".writing"
            max-fetch-size="-1"
            delete-remote-files="false">
        <int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>

前面的配置示例展示了如何为各种属性提供值,包括以下内容:spring-doc.cadn.net.cn

默认情况下,传输的文件名称与原始文件相同。 如果你想覆盖这个行为,可以设置本地文件名生成器表达式属性,允许你提供一个 SpEL 表达式来生成本地文件的名称。 与出站网关和适配器不同,后者的 SpEL 评估上下文的根对象是消息该入站适配器在评估时尚未获得该消息,因为最终生成的消息是它以传输文件为有效载荷。 因此,SpEL 评估上下文的根对象是远程文件的原始名称(a字符串).spring-doc.cadn.net.cn

入站通道适配器首先将文件检索到本地目录,然后根据轮询器配置发送每个文件。 从5.0版本开始,你可以限制在需要新文件检索时从SFTP服务器获取的文件数量。 当目标文件体积较大,或在带有持久文件列表过滤器的集群系统中运行时,这非常有用,后面将在本节讨论。 用最大取取大小为此目的。 负值(默认值)表示无限制,所有匹配文件都会被检索。 更多信息请参见入站通道适配器:控制远程文件取用。 从5.0版本开始,你也可以提供自定义目录扫描器实现到......入站信道适配器通过设置扫描器属性。spring-doc.cadn.net.cn

从 Spring Integration 3.0 开始,你可以指定保留时间戳属性(默认为false). 什么时候true,本地文件修改的时间戳设置为从服务器获取的值。 否则,时间会被设置为当前。spring-doc.cadn.net.cn

从4.2版本开始,你可以指定远程目录表达式而不是远程目录,这允许你动态确定每次轮询的目录——例如,remote-directory-expression=“@myBean.determineRemoteDir()”.spring-doc.cadn.net.cn

有时,文件过滤基于以下简单模式指定文件名模式属性可能不够。 如果是这样,你可以使用文件名正则表达式属性以指定正则表达式(例如,Filename-regex=“.*\.test$”). 如果你需要完全控制,可以使用Filter属性以提供对org.springframework.integration.file.filters.FileListFilter,这是一种用于过滤文件列表的策略界面。 该过滤器决定检索哪些远程文件。 你也可以将基于图案的Filter与其他Filter结合使用(例如AcceptOnceFileListFilter,以避免同步之前被抓取的文件)通过使用CompositeFileListFilter.spring-doc.cadn.net.cn

AcceptOnceFileListFilter将状态存储在内存中。 如果你希望该状态在系统重启后存活,可以考虑使用SftpPersistentAcceptOnceFileListFilter相反。 该过滤器将被接受的文件名存储在元数据存储策略(参见元数据存储)。 该过滤器在文件名和远程修改时间上匹配。spring-doc.cadn.net.cn

自4.0版本起,该Filter要求并发元数据存储. 当与共享数据存储一起使用时(例如:Redis其中RedisMetadataStore这使得过滤键可以在多个应用或服务器实例间共享。spring-doc.cadn.net.cn

从5.0版本开始,SftpPersistentAcceptOnceFileListFilter内存中的SimpleMetadataStore默认应用于SftpInboundFileSynchronizer. 该Filter也被应用,同时正则表达式模式XML 配置中的选项,以及通过SftpInboundChannelAdapterSpec在Java DSL中。 你可以通过以下方式处理其他用例CompositeFileListFilter(或ChainFileListFilter).spring-doc.cadn.net.cn

上述讨论指的是在检索文件前先过滤。 文件检索后,会对文件系统中的文件施加额外过滤器。 默认情况下,这是“AcceptOnceFileListFilter”,如本节所述,它保留内存状态,不考虑文件的修改时间。 除非你的应用程序在处理后删除文件,否则适配器默认会在应用重启后重新处理磁盘上的文件。spring-doc.cadn.net.cn

另外,如果你配置了Filter使用SftpPersistentAcceptOnceFileListFilter且远程文件时间戳发生变化(导致重新取用),默认的本地过滤器不允许处理该新文件。spring-doc.cadn.net.cn

有关该过滤器的更多信息及其使用方式,请参见远程持久文件列表过滤器spring-doc.cadn.net.cn

你可以使用局部Filter属性来配置本地文件系统过滤器的行为。 从4.3.8版本开始,FileSystemPersistentAcceptOnceFileListFilter是默认配置的。 该过滤器将接受的文件名和修改后的时间戳存储在元数据存储策略(参见元数据存储)并检测本地文件修改时间的变化。 默认元数据存储SimpleMetadataStore它将状态存储在内存中。spring-doc.cadn.net.cn

自4.1.5版本起,这些过滤器新增了一个性质,称为flushOn更新,这导致它们冲洗 每次更新时的元数据存储(如果存储实现了可冲洗).spring-doc.cadn.net.cn

此外,如果你使用分布式元数据存储(比如Redis元数据存储),你可以拥有多个同一个适配器或应用程序的实例,并确保只有一个实例处理一个文件。

实际的局部Filter是CompositeFileListFilter该模块包含提供的过滤器和一个模式过滤器,防止处理正在下载的文件(基于临时文件后缀). 文件下载时带有这个后缀(默认是。写作),传输完成后文件会被重命名为最终名称,使它们对过滤器“可见”。spring-doc.cadn.net.cn

有关这些属性的详细内容,请参见示范式spring-doc.cadn.net.cn

SFTP的入站通道适配器是一个轮询消费者。 因此,你必须配置一个轮询器(无论是全局默认还是本地元素)。 一旦文件被传输到本地目录,会发送一条消息java.io.file由于其有效载荷类型被生成并发送到由渠道属性。spring-doc.cadn.net.cn

从6.2版本开始,你可以根据最后修改策略过滤SFTP文件,使用以下方式SftpLastModifiedFileListFilter. 该Filter可配置为年龄该属性使得过滤器只传递比该值更早的文件。 默认年龄为60秒,但你应选择足够大的年龄,以避免因网络故障等原因提前获取文件。 可以查看它的Java文档了解更多信息。spring-doc.cadn.net.cn

更多关于文件过滤和大文件的内容

有时,刚出现在监控(远程)目录中的文件是不完整的。 通常,这样的文件会带有某种临时扩展名(例如。写作在一个名为something.txt.写作),然后在写作过程完成后重新命名。 在大多数情况下,开发者只对完整的文件感兴趣,并希望只过滤这些文件。 为了处理这些情况,你可以使用以下文件名模式,文件名正则表达式Filter属性。 如果你需要自定义Filter实现,可以通过设置Filter属性。 以下示例展示了如何实现:spring-doc.cadn.net.cn

<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
            channel="receiveChannel"
            session-factory="sftpSessionFactory"
            filter="customFilter"
            local-directory="file:/local-test-dir"
            remote-directory="/remote-test-dir">
        <int:poller fixed-rate="1000" max-messages-per-poll="10" task-executor="executor"/>
</int-sftp:inbound-channel-adapter>

<bean id="customFilter" class="org.foo.CustomFilter"/>

从失败中恢复

你应该了解适配器的架构。 文件同步器获取文件,并且文件阅读消息源为每个同步文件发送一条消息。 如前所述,涉及两个过滤器。 这Filter属性(和模式)指的是远程(SFTP)文件列表,以避免取出已经取过的文件。 这文件阅读消息源使用局部Filter以确定哪些文件将作为消息发送。spring-doc.cadn.net.cn

同步器会列出远程文件并查阅其过滤器。 文件随后被转移。 如果文件传输过程中发生 IO 错误,已添加到过滤器中的文件会被移除,以便在下一次轮询时有资格重新获取。 这仅适用于Filter实现可逆文件列表过滤器(例如AcceptOnceFileListFilter).spring-doc.cadn.net.cn

如果在文件同步后,处理文件的下游流程发生错误,则不会自动回滚过滤器,因此失败的文件默认不会被重新处理。spring-doc.cadn.net.cn

如果你希望在失败后重新处理此类文件,可以使用类似以下配置,以便将失败文件从过滤器中移除:spring-doc.cadn.net.cn

<int-sftp:inbound-channel-adapter id="sftpAdapter"
        session-factory="sftpSessionFactory"
        channel="requestChannel"
        remote-directory-expression="'/sftpSource'"
        local-directory="file:myLocalDir"
        auto-create-local-directory="true"
        filename-pattern="*.txt">
    <int:poller fixed-rate="1000">
        <int:transactional synchronization-factory="syncFactory" />
    </int:poller>
</int-sftp:inbound-channel-adapter>

<bean id="acceptOnceFilter"
    class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>

<bean id="transactionManager"
    class="org.springframework.integration.transaction.PseudoTransactionManager" />

上述配置适用于任意ResettableFileListFilter.spring-doc.cadn.net.cn

从5.0版本开始,入站通道适配器可以根据生成的本地文件名在本地构建子目录。 这也可以是远程子路径。 为了能够递归地读取本地目录以便根据层级支持修改,你现在可以提供内部目录文件阅读消息源递归目录扫描器基于Files.walk()算法。 看摘要InboundFileSynchronizingMessageSource.setScanner()更多信息请见。 另外,你现在可以切换摘要InboundFile同步消息源前往守望服务-基于目录扫描器通过setUseWatchService()选择。 它也为所有观察事件类型实例对本地目录中的任何修改进行反应。 之前展示的重处理样本基于文件阅读信息源.WatchService目录扫描器,其使用方式ResettableFileListFilter.remove()当文件被删除时(StandardWatchEventKinds.ENTRY_DELETE)来自本地目录。 看WatchService目录扫描器更多信息请见。spring-doc.cadn.net.cn

使用 Java 配置配置

以下 Spring Boot 应用程序展示了如何用 Java 配置入站适配器的示例:spring-doc.cadn.net.cn

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(port);
        factory.setUser("foo");
        factory.setPassword("foo");
        factory.setAllowUnknownKeys(true);
        factory.setTestSession(true);
        return new CachingSessionFactory<>(factory);
    }

    @Bean
    public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
        SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(false);
        fileSynchronizer.setRemoteDirectory("foo");
        fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
        return fileSynchronizer;
    }

    @Bean
    @InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
    public MessageSource<File> sftpMessageSource() {
        SftpInboundFileSynchronizingMessageSource source =
                new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("sftp-inbound"));
        source.setAutoCreateLocalDirectory(true);
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());
        source.setMaxFetchSize(1);
        return source;
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

使用 Java DSL 配置

以下 Spring Boot 应用程序展示了如何用 Java DSL 配置入站适配器的示例:spring-doc.cadn.net.cn

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow sftpInboundFlow() {
        return IntegrationFlow
            .from(Sftp.inboundAdapter(this.sftpSessionFactory)
                    .preserveTimestamp(true)
                    .remoteDirectory("foo")
                    .regexFilter(".*\\.txt$")
                    .localFilenameExpression("#this.toUpperCase() + '.a'")
                    .localDirectory(new File("sftp-inbound")),
                 e -> e.id("sftpInboundAdapter")
                    .autoStartup(true)
                    .poller(Pollers.fixedDelay(5000)))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
    }
}

处理不完整数据

SftpSystemMarkerFilePresentFileListFilter提供用于过滤远程系统中没有相应标记文件的远程文件。 有关配置信息,请参见 Javadocspring-doc.cadn.net.cn