该版本仍在开发中,尚未被视为稳定。对于最新的稳定版本,请使用 Spring Integration 7.0.0spring-doc.cadn.net.cn

FTP 流式入站通道适配器

4.3 版本引入了流式入站通道适配器。该适配器产生带有 类型的有效载荷的消息输入流,允许在不写入本地文件系统的情况下获取文件。由于会话保持开放,耗用应用程序负责在文件完成后关闭会话 消耗。 该会话提供于可关闭资源头部 (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE). 标准框架组件,例如文件分拆器StreamTransformer, 自动关闭会话。有关这些组件的更多信息,请参见文件分区器和流变换器。以下示例展示了如何配置入站流通道适配器:spring-doc.cadn.net.cn

<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
            channel="ftpChannel"
            session-factory="sessionFactory"
            filename-pattern="*.txt"
            filename-regex=".*\.txt"
            filter="filter"
            filter-expression="@myFilterBean.check(#root)"
            remote-file-separator="/"
            comparator="comparator"
            max-fetch-size="1"
            remote-directory-expression="'foo/bar'">
        <int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>

只有其中一个文件名模式,文件名正则表达式,Filter滤波表达式是允许的。spring-doc.cadn.net.cn

从5.0版本开始,默认情况下,FtpStreamingMessageSource适配器防止远程文件的重复,满足条件FtpPersistentAcceptOnceFileListFilter基于内存SimpleMetadataStore. 默认情况下,这个过滤器也会与文件名模式(或正则表达式)一起应用。如果你需要允许重复,可以使用AcceptAllFileListFilter. 其他任何用例都可以由CompositeFileListFilter(或ChainFileListFilter). 文档后面的 Java 配置展示了一种处理后移除远程文件以避免重复的技术。

欲了解更多关于FtpPersistentAcceptOnceFileListFilter,及其使用方式,请参见远程持久文件列表过滤器spring-doc.cadn.net.cn

使用该最大取取大小属性限制每次轮询中获取文件数量,当需要获取时。设置为1在集群环境中运行时使用持久过滤器。更多信息请参见入站通道适配器:控制远程文件获取spring-doc.cadn.net.cn

适配器将远程目录和文件名放入FileHeaders.REMOTE_DIRECTORYFileHeaders.REMOTE_FILE分别是头部。从5.0版本开始,FileHeaders.REMOTE_FILE_INFO头部提供额外的远程文件信息(默认以 JSON 表示)。如果您设置了fileInfoJson属性FtpStreamingMessageSourcefalse,头部包含一个FtpFileInfo对象。 这FTPFile可通过以下方式访问底层Apache Net库提供的对象FtpFileInfo.getFileInfo()方法。 这fileInfoJson使用 XML 配置时无法使用属性,但你可以通过注入FtpStreamingMessageSource进入你的配置类之一。另见远程文件信息spring-doc.cadn.net.cn

从5.1版本开始,通用类型比较仪FTPFile. 以前,它是抽象文件信息<FTPFile>. 这是因为排序现在在处理的早期阶段进行,在筛选和应用之前maxFetch.spring-doc.cadn.net.cn

使用 Java 配置配置

以下 Spring Boot 应用程序展示了如何用 Java 配置配置入站适配器的示例:spring-doc.cadn.net.cn

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    @InboundChannelAdapter(channel = "stream")
    public MessageSource<InputStream> ftpMessageSource() {
        FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
        messageSource.setRemoteDirectory("ftpSource/");
        messageSource.setFilter(new AcceptAllFileListFilter<>());
        messageSource.setMaxFetchSize(1);
        return messageSource;
    }

    @Bean
    @Transformer(inputChannel = "stream", outputChannel = "data")
    public org.springframework.integration.transformer.Transformer transformer() {
        return new StreamTransformer("UTF-8");
    }

    @Bean
    public FtpRemoteFileTemplate template() {
        return new FtpRemoteFileTemplate(ftpSessionFactory());
    }

    @ServiceActivator(inputChannel = "data", adviceChain = "after")
    @Bean
    public MessageHandler handle() {
        return System.out::println;
    }

    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice after() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpression(
                "@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
        advice.setPropagateEvaluationFailures(true);
        return advice;
    }

}

注意,在这个例子中,变换器下游的消息处理程序有建议处理后会移除远程文件。spring-doc.cadn.net.cn