|
该版本仍在开发中,尚未被视为稳定。对于最新的稳定版本,请使用 Spring Integration 7.0.0! |
读取文件
一个文件阅读消息源可以用来消耗文件系统中的文件。
这是一个实现消息源它从文件系统目录中生成消息。
以下示例展示了如何配置文件阅读消息源:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:directory="${input.directory}"/>
为了防止为某些文件创建消息,你可以提供文件列表过滤器.
默认情况下,我们使用以下过滤器:
-
IgnoreHiddenFileListFilter -
AcceptOnceFileListFilter
这IgnoreHiddenFileListFilter确保隐藏文件不会被处理。
注意,隐藏的确切定义是系统相关的。
例如,在基于UNIX的系统中,以句点字符开头的文件被视为隐藏文件。
而 Microsoft Windows 则有一个专门的文件属性来表示隐藏文件。
|
4.2 版本引入了 |
这AcceptOnceFileListFilter确保文件只从目录中取取一次。
|
这 自4.0版本起,该Filter要求 自4.1.5版本起,该Filter新增了一个性质( |
持久文件列表过滤器现在具有布尔属性递归.
将该属性设置为true,也设alwaysAccept目录,这意味着对出站网关的递归作(LS和MGET)现在每次都会遍历整个目录树。
这是为了解决目录树深层变更未被检测到的问题。
另外forRecursion=真使得文件的完整路径被用作元数据存储键;这解决了当同名文件在不同目录中多次出现时,过滤器无法正常工作的问题。
重要提示:这意味着在持久的元数据存储中,无法找到顶层目录以下文件的现有密钥。
因此,该性质为false默认情况下;未来发布可能会有所变化。
以下示例配置为文件阅读消息源带Filter:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="customFilterBean"/>
读取文件时的一个常见问题是,文件可能在尚未准备好之前就被检测到(即其他进程可能仍在写入该文件)。
默认AcceptOnceFileListFilter但这并不能阻止这一点。
在大多数情况下,如果文件写入过程在文件准备好读取时立即重命名,这种情况可以避免。
一个文件名模式或文件名正则表达式过滤器只接受已准备好的文件(可能基于已知后缀),并以默认AcceptOnceFileListFilter,允许这种情况。
这CompositeFileListFilter使得该复合成为可能,如下例子所示:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="compositeFilter"/>
<bean id="compositeFilter"
class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
<bean class="o.s.i.file.filters.RegexPatternFileListFilter">
<constructor-arg value="^test.*$"/>
</bean>
</list>
</constructor-arg>
</bean>
如果无法用临时名称创建文件并重命名为最终名称,Spring Integration 提供了另一种选择。
4.2 版本增加了LastModifiedFileListFilter.
该Filter可配置为年龄该属性使得过滤器只传递比该值更早的文件。
默认年龄为60秒,但你应选择足够大的年龄,以避免因网络故障等原因提前获取文件。
以下示例展示了如何配置LastModifiedFileListFilter:
<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
<property name="age" value="120" />
</bean>
从版本4.3.7开始,ChainFileListFilter(扩展CompositeFileListFilter)被引入以允许后续Filter只看到前一Filter的结果。
(其中CompositeFileListFilter,所有过滤器都能看到所有文件,但只传递通过所有过滤器的文件)。
需要新行为的一个例子是以下组合:LastModifiedFileListFilter和AcceptOnceFileListFilter当我们希望在一定时间内不接受该文件时。
与CompositeFileListFilter,因为AcceptOnceFileListFilter第一次通过时会看到所有文件,之后被另一个过滤器通过时它就不会通过。
这CompositeFileListFilter当模式过滤器与寻找次级文件以表示文件传输完成的自定义过滤器结合时,这种方法非常有用。
模式过滤器可能只通过主文件(例如something.txt但“完成”过滤器需要查看(例如)做了什么存在。
假设我们有文件a.txt,完成和b.txt.
模式Filter只通过a.txt和b.txt而“完成”过滤器只能看到这三个文件并通过a.txt.
复合Filter的最终结果仅为a.txt被释放。
与ChainFileListFilter如果链中任一过滤器返回空列表,则其余过滤器不会被调用。 |
5.0 版本引入了ExpressionFileListFilter作为上下文评估根对象,对文件执行 SpEL 表达式。
为此,所有用于文件处理的XML组件(本地和远程)以及现有的Filter属性,已被提供滤波表达式选项,如下示例所示:
<int-file:inbound-channel-adapter
directory="${inputdir}"
filter-expression="name matches '.text'"
auto-startup="false"/>
5.0.5 版本引入了DiscardAwareFileListFilter那些对被拒绝文件感兴趣的实现。
为此,应通过以下方式提供回调。addDiscardCallback(Consumer<File>).
在框架中,该功能来自文件阅读信息源.WatchService目录扫描器,结合LastModifiedFileListFilter.
与普通的不一样目录扫描器这守望服务根据目标文件系统上的事件,提供处理文件。
在轮询包含这些文件的内部队列时,LastModifiedFileListFilter可能会因为它们相对于配置而言太年轻而被丢弃年龄.
因此,我们失去了未来可能考虑的档案。
丢弃回调钩让我们能够将文件保留在内部队列中,以便与年龄在后续的民调中。
这CompositeFileListFilter同时实现了DiscardAwareFileListFilter并填充一个丢弃回调,映射到其所有DiscardAwareFileListFilter代表。
因为CompositeFileListFilter将文件与所有代表匹配,discard回呼同一文件可能会多次调用。 |
从5.1版本开始,文件阅读消息源它不会检查目录的存在性,也不会在它开始()称为(通常通过包裹)SourcePollingChannelAdapter).
此前,没有简单的方法可以防止在引用目录时(例如测试时)或后续应用权限时发生作系统权限错误。
消息头
从5.0版本开始,文件阅读消息源(除了有效载荷作为民调文件)填充以下出站的头部消息:
-
FileHeaders.FILENAME:这File.getName()文件要发送。 可用于后续的重命名或复制逻辑。 -
FileHeaders.ORIGINAL_FILE:这文件物体本身。 通常,当我们丢失原始节点时,这个头部会自动被框架组件(如分流器或变换器)填充文件对象。 不过,为了与其他自定义用例保持一致性和便利性,这个头部有助于访问原始文件。 -
FileHeaders.RELATIVE_PATH: 新增了用于表示文件路径相对于扫描根目录的部分。 当需要恢复其他地方的源目录层级时,这个头部非常有用。 为此,DefaultFileNameGenerator(参见“'生成文件名”)可以配置为使用此头部。
目录扫描与轮询
这文件阅读消息源不会立即为目录中的文件发送消息。
它使用内部队列来存储由扫描器.
这scanEachPoll选项用于确保每次轮询中内部队列更新为最新的输入目录内容。
默认情况下(scanEachPoll = false),文件阅读消息源清空队列后再次扫描目录。
这种默认行为对于减少目录中大量文件的扫描尤其有用。
然而,在需要自定义排序的情况下,必须考虑将该标志设置为true.
文件处理的顺序可能与预期不同。
默认情况下,队列中的文件会以自然处理方式(路径)秩序。
通过扫描添加的新文件,即使队列中已有文件,也会入到相应位置以保持自然顺序。
为了自定义订单,文件阅读消息源可以接受比较器<文件>作为构造子论元。
它被内部(优先阻挡队列)根据业务需求重新排序内容。
因此,为了按特定顺序处理文件,你应该提供一个比较器文件阅读消息源而不是按习惯顺序列出列表目录扫描器.
5.0 版本推出递归目录扫描器以执行文件树访问。
实现基于Files.walk(路径开始,int maxDepth,FileVisitOption...选项)功能性。
根目录(DirectoryScanner.listFiles(文件))的论证被排除在结果之外。
所有其他子目录的包含和排除均基于目标文件列表过滤器实现。
例如,SimplePatternFileListFilter默认过滤目录。
看AbstractDirectoryAwareFileListFilter以及其实现以获取更多信息。
从5.5版本开始,FileInboundChannelAdapterSpecJava DSL 的递归(布尔)选择使用递归目录扫描器在目标中文件阅读消息源而不是默认的。 |
命名空间支持
通过使用文件特定命名空间,可以简化文件读取的配置。为此,可以使用以下模板:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>
在这个命名空间内,你可以将文件阅读消息源并用入站通道适配器包裹,具体如下:
<int-file:inbound-channel-adapter id="filesIn1"
directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>
<int-file:inbound-channel-adapter id="filesIn2"
directory="file:${input.directory}"
filter="customFilterBean" />
<int-file:inbound-channel-adapter id="filesIn3"
directory="file:${input.directory}"
filename-pattern="test*" />
<int-file:inbound-channel-adapter id="filesIn4"
directory="file:${input.directory}"
filename-regex="test[0-9]+\.txt" />
第一个通道适配器示例依赖默认文件列表过滤器实现:
-
IgnoreHiddenFileListFilter(不处理隐藏文件) -
AcceptOnceFileListFilter(防止重复)
因此,你也可以省略防止重复和忽略隐藏属性,正如它们本身true默认。
|
Spring Integration 4.2 引入了 |
第二个信道适配器示例使用自定义过滤器,第三个示例使用文件名模式属性以添加一个蚁径匹配器基于Filter,第四个则使用文件名正则表达式属性中添加基于正则表达式模式的Filter文件阅读消息源.
这文件名模式和文件名正则表达式属性与正则 互斥Filter参考属性。不过,你可以使用Filter属性用于引用 的实例CompositeFileListFilter它结合了多种过滤器,包括一个或多个基于图案的过滤器,以满足您的具体需求。
当多个进程同时读取同一个目录时,你可能需要锁定文件,以防止它们同时被拾取。为此,你可以使用文件锁. 有一个Java.nio基于 的实现已提供,但也可以实现自己的锁定方案。 这蔚来储物柜的注入方式如下:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:nio-locker/>
</int-file:inbound-channel-adapter>
你可以按以下方式配置自定义储物柜:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
当文件入站适配器配置了锁器时,它负责在文件被允许接收前先获得锁。它不承担解锁文件的责任。如果你已经处理了文件并保持锁悬挂,说明存在内存泄漏。如果这是个问题,你应该联系FileLocker.unlock(文件文件)在合适的时间,你自己。 |
当过滤和锁定文件不足时,你可能需要完全控制文件的列表方式。要实现这种要求,可以使用以下实现目录扫描器. 这个扫描仪可以让你准确确定每个轮询中列出了哪些文件。这也是 Spring Integration 内部用来布线的接口文件列表过滤器实例和文件锁前往文件阅读消息源. 你可以注入自定义目录扫描器进入<int-file:inbound-channel-adapter/>在扫描器属性,如下例所示:
<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
scanner="customDirectoryScanner"/>
这样做可以让你完全自由选择排序、列表和锁定策略。
还需要理解过滤器(包括模式,正则表达式,防止重复以及其他)柜子实例实际上被扫描器. 适配器上设置的任意属性随后会被注入内部扫描器. 对于外部扫描器,所有过滤器和锁柜属性均禁止在文件阅读消息源. 必须在该自定义中指定(如有需要)目录扫描器. 换句话说,如果你注入一个扫描器进入文件阅读消息源你应该提供Filter和柜子关于这个扫描器,不在文件阅读消息源.
默认情况下,默认目录扫描器使用IgnoreHiddenFileListFilter以及AcceptOnceFileListFilter. 为了防止它们的使用,你可以配置自己的过滤器(例如AcceptAllFileListFilter甚至设置为零. |
WatchService目录扫描器
这文件阅读信息源.WatchService目录扫描器当新文件添加到目录时,依赖于文件系统事件。初始化过程中,目录被注册生成事件。初始化过程中也会构建初始文件列表。在浏览目录树时,遇到的任何子目录也会被注册生成事件。在第一次轮询中,返回从目录中获得的初始文件列表。在后续轮询中,返回来自新创建事件的文件。如果添加了新子目录,其创建事件用于移动新子树以查找现有文件并注册发现的任何新子目录。
这里有个问题WatchKey(观测钥匙)当其内部事件发生时队列程序不会像目录修改事件发生时那样快速耗尽 。
如果队列大小超过,则StandardWatchEventKinds.OVERFLOW是通过发送表示某些文件系统事件可能丢失的。
此时,根目录会被完全重新扫描。
为避免重复,考虑使用合适的文件列表过滤器(例如AcceptOnceFileListFilter)或在处理完成后删除文件。 |
这WatchService目录扫描器可以通过FileReadingMessageSource.use-watch-service该选项与扫描器选择。
一个内部文件阅读信息源.WatchService目录扫描器实例填充了目录.
此外,现在守望服务轮询逻辑可以追踪StandardWatchEventKinds.ENTRY_MODIFY和StandardWatchEventKinds.ENTRY_DELETE.
如果你需要跟踪现有文件和新文件的修改情况,你应该实现ENTRY_MODIFY事件逻辑文件列表过滤器.
否则,这些事件的文件也会被同样地处理。
这ResettableFileListFilter实现中ENTRY_DELETE事件。
因此,他们的档案被提供给移除()操作。
当该事件被启用时,会有以下过滤器AcceptOnceFileListFilter删除文件。
因此,如果出现同名文件,它会通过过滤器并以消息形式发送。
为此,观察事件性质 (FileReadingMessageSource.setWatchEvents(WatchEventType...观看活动))已经被引入。
(观察事件类型是 中公开的内部枚举文件阅读消息源.)
有了这样的选项,我们可以用一种下游流逻辑处理新文件,用其他逻辑处理修改文件。
以下示例展示了如何配置同一目录中创建和修改事件的不同逻辑:
值得一提的是ENTRY_DELETE事件涉及被监视目录子目录的重命名作。
更具体地说,ENTRY_DELETE事件,与前一个目录名称相关,先于ENTRY_CREATE该事件通知新(重命名)目录的存在。
在某些作系统(如 Windows)上,以下ENTRY_DELETE事件必须注册以应对这种情况。
否则,在文件资源管理器中重命名已观看子目录可能会导致新文件在该子目录中无法被检测到。
<int-file:inbound-channel-adapter id="newFiles"
directory="${input.directory}"
use-watch-service="true"/>
<int-file:inbound-channel-adapter id="modifiedFiles"
directory="${input.directory}"
use-watch-service="true"
filter="acceptAllFilter"
watch-events="MODIFY"/> <!-- The default is CREATE. -->
从6.1版本开始,文件阅读消息源揭露了两项新守望服务- 相关选项:
-
观看MaxDepth。- 关于Files.walkFileTree(路径根,Set attributes,整数 maxDepth,FileVisitor visitor)应用程序接口; -
观看DirPredicate-一个谓词<路径>测试扫描树中的目录是否应被走动并注册给守望服务以及配置好的观察事件类型。
限制内存消耗
你可以用头目录扫描器以限制内存中保留的文件数量。
这在扫描大型目录时非常有用。
通过设置 XML 配置实现队列大小入站通道适配器上的属性。
在4.2版本之前,该设置与其他Filter不兼容。
其他过滤器(包括)防止重复=“true”) 覆盖了用于限制大小的Filter。
|
使用 通常,不使用 |
使用 Java 配置配置
以下 Spring Boot 应用程序展示了如何用 Java 配置配置出站适配器的示例:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(INBOUND_PATH));
source.setFilter(new SimplePatternFileListFilter("*.txt"));
return source;
}
@Bean
@Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}
}
使用 Java DSL 配置
以下 Spring Boot 应用程序展示了如何用 Java DSL 配置出站适配器的示例:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(new File(INBOUND_PATH))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Files.toStringTransformer())
.channel("processFileChannel")
.get();
}
}
“跟踪档案”
另一个常见的用例是从文件的尾部获取“行”,并在新增行时捕捉它们。
提供两种实现方式。
第一,OSDelegatingFileTailingMessageProducer,使用原生尾巴命令(在拥有命令的作系统上)。
这通常是这些平台上最高效的实现方式。
对于没有尾巴命令,第二个实现,ApacheCommonsFile尾随消息制作人,使用 ApacheCommons-IO 尾商类。
在这两种情况下,文件系统事件,如文件不可用和其他事件,都发布为ApplicationEvent通过使用正常的 Spring 事件发布机制来实现实例。
此类事件的例子包括:
[message=tail: cannot open '/tmp/somefile' for reading:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become inaccessible:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has appeared;
following end of new file, file=/tmp/somefile]
例如,当文件被旋转时,可能出现前述示例中显示的事件序列。
从5.0版本开始,aFileTailingIdleEvent当文件中没有数据时,会发出idleEventInterval.
以下示例展示了此类事件的样貌:
[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
并非所有平台都支持尾巴命令提供这些状态消息。 |
这些端点发出的消息具有以下头部:
-
FileHeaders.ORIGINAL_FILE:这文件对象 -
FileHeaders.FILENAME: 文件名 (File.getName())
在5.0版本之前的版本中,FileHeaders.FILENAME头部包含文件绝对路径的字符串表示。
你现在可以通过调用 得到该字符串表示getAbsolutePath()在原始文件头部。 |
以下示例创建了一个原生适配器,默认选项为“-F -n 0”,表示从当前端跟随文件名。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
task-executor="exec"
file="/tmp/foo"/>
以下示例创建了一个带有“-F -n +0”选项的原生适配器(即跟随文件名,输出所有现有行)。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
native-options="-F -n +0"
task-executor="exec"
file-delay=10000
file="/tmp/foo"/>
如果尾巴命令失败(在某些平台上,缺失文件会导致尾巴即使失败,也无法-F指定),命令每10秒重试一次。
默认情况下,本地适配器从标准输出捕获内容并以消息形式发送。
它们还通过标准误捕获事件来引发事件。
从4.3.6版本开始,你可以通过设置使能状态读取器自false,如下示例所示:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
enable-status-reader="false"
task-executor="exec"
file="/tmp/foo"/>
在以下例子中,IdleEventInterval设置为5000,意味着如果五秒内没有写入任何行,FileTailingIdleEvent每五秒触发一次:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
idle-event-interval="5000"
task-executor="exec"
file="/tmp/somefile"/>
这在你需要停止适配器时非常有用。
以下示例创建了一个 ApacheCommons-IO 尾商适配器每两秒检查一次文件是否有新行,每十秒检查一次缺失文件:
<int-file:tail-inbound-channel-adapter id="apache"
channel="input"
task-executor="exec"
file="/tmp/bar"
delay="2000"
end="false" (1)
reopen="true" (2)
file-delay="10000"/>
| 1 | 文件从一开始就被尾部跟踪(end=“false”)而不是结尾(默认的)。 |
| 2 | 每个分块都会重新打开文件(默认是保持文件开启)。 |
具体说明延迟,结束或重开属性强制使用 ApacheCommons-IO适配器并制造原生选项属性不可用。 |
处理不完整数据
文件传输场景中常见的问题是如何判断传输是否完成,以避免开始读取不完整的文件。
解决这个问题的常见方法是先用临时名称写入文件,然后原子重命名为最终名称。
这种技术,加上一个遮蔽临时文件不被消费者拾取的过滤器,提供了一个稳健的解决方案。
该技术被用于 Spring 集成组件,用于本地或远程写入文件。
默认情况下,它们会附加。写作迁移完成后删除文件名。