|
对于最新稳定版本,请使用 Spring Integration 7.0.0! |
文件分流器
这文件分拆器在4.1.2版本中加入,其命名空间支持也在4.2版本中加入。
这文件分拆器将文本文件拆分为单独的行,基于BufferedReader.readLine().
默认情况下,分线器使用迭 代在从文件中读取时,逐行输出。
设置迭 代属性到false它会先将所有行读取到内存中,然后再以消息形式发送。
一个用例可能是你想在发送包含行的消息前检测文件上的I/O错误。
然而,它只适用于相对较短的文件。
入站有效载荷可以是文件,字符串(a文件路径),输入流或读者.
其他有效载荷类型则保持不变。
以下列表展示了配置文件分拆器:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@SpringBootApplication
public class FileSplitterApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileSplitterApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileSplitterFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(tmpDir.getRoot())
.filter(new ChainFileListFilter<File>()
.addFilter(new AcceptOnceFileListFilter<>())
.addFilter(new ExpressionFileListFilter<>(
new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
.split(Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.channel(c -> c.queue("fileSplittingResultChannel"))
.get();
}
}
@Bean
fun fileSplitterFlow() =
integrationFlow(
Files.inboundAdapter(tmpDir.getRoot())
.filter(
ChainFileListFilter<File?>()
.addFilter(AcceptOnceFileListFilter())
.addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
)
) {
split(
Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true)
)
channel { queue("fileSplittingResultChannel") }
}
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
FileSplitter splitter = new FileSplitter(true, true);
splitter.setApplySequence(true);
splitter.setOutputChannel(outputChannel);
return splitter;
}
<int-file:splitter id="splitter" (1)
iterator="" (2)
markers="" (3)
markers-json="" (4)
apply-sequence="" (5)
requires-reply="" (6)
charset="" (7)
first-line-as-header="" (8)
input-channel="" (9)
output-channel="" (10)
send-timeout="" (11)
auto-startup="" (12)
order="" (13)
phase="" /> (14)
| 1 | 分水机的豆子名字。 |
| 2 | 设置为true(默认情况下)使用迭代器或false在发送行之前将文件加载到内存中。 |
| 3 | 设置为true在文件数据之前和之后发送文件开始和结束的标记信息。
标记是消息,具有FileSplitter.FileMarker有效载荷(其中开始和结束在马克财产)。
在下游流程中顺序处理文件时,有些行被过滤,可以用标记。
它们使下游处理能够知道文件是否已完全处理完毕。
此外,还有一个file_marker包含开始或结束被添加到这些消息中。
这结束标记包含行数。
如果文件是空的,只有开始和结束标记物发射为0作为行计数.
默认为false.
什么时候true,应用序列是false默认。
参见markers-json(下一个属性)。 |
| 4 | 什么时候标记是真,设为true拥有FileMarker对象可以转换为 JSON 字符串。
(使用SimpleJsonSerializer在下面)。 |
| 5 | 设置为false以禁用包含序列大小和序列号消息中的头部。
默认为true除非标记是true.
什么时候true和标记是true,这些标记被包含在测序中。
什么时候true和迭 代是true这序列大小首部设置为0,因为其大小未知。 |
| 6 | 设置为true导致请求回复例外如果文件中没有行,则会被抛弃。
默认为false. |
| 7 | 将字符集名称设置为读取文本数据时使用字符串负载。
默认是平台字符集。 |
| 8 | 第一行的头部名称作为后续行消息的头部。 自5.0版本起。 |
| 9 | 设置用于向分路器发送消息的输入信道。 |
| 10 | 设置发送消息的输出信道。 |
| 11 | 设置发送超时。
仅在以下情况下适用。输出通道可以阻挡——例如全封队列通道. |
| 12 | 设置为false禁用在上下文刷新时自动启动分线器的功能。
默认为true. |
| 13 | 如果输入通道是<发布-订阅-频道/>. |
| 14 | 为分配器设置启动阶段(使用自动启动是true). |
这文件分拆器还能拆分任何基于文本的内容输入流排成线。
从4.3版本开始,当与使用FTP或SFTP流式入站通道适配器或使用使用以下条件的FTP或SFTP出站网关时,流选择获取文件时,分流器会自动关闭支持该流的会话,当文件被完全消耗时
有关这些设施的更多信息,请参见FTP流式入站通道适配器和SFTP流式入站通道适配器,以及FTP出站网关和SFTP出站网关。
使用 Java 配置时,还提供了一个额外的构造函数,如下示例所示:
public FileSplitter(boolean iterator, boolean markers, boolean markersJson)
什么时候markersJson是真的,标记以 JSON 字符串表示(使用SimpleJsonSerializer).
5.0 版本引入了firstLineAsHeader选择将第一行内容指定为头部(例如CSV文件中的列名)。
传递给该属性的参数是首行作为头部的名称,在后续行的消息中作为头部。
该行不包含在序列头部(如果应用序列是真的,也不属于行计数与FileMarker.END.
注意:从5.5版本开始,lineCount'也作为一个FileHeaders.LINE_COUNT进入FileMarker.END消息,自FileMarker可以序列化为JSON。
如果文件仅包含头部行,则该文件被视为空,因此仅FileMarker实例在分拆过程中会被发射(如果启用了标记——否则则不发送消息)。
默认情况下(如果未设置头部名称),第一行被视为数据,成为第一个发出消息的有效载荷。
如果你需要更复杂的头部提取逻辑(不是第一行,不是整行内容,不是某个特定头部等),可以考虑在文件分拆器.
请注意,已移动到头部的行可能会在正常内容处理的下游被过滤。
幂零下游处理分拆文件
什么时候应用序列成立时,分线器将行号加到以下SEQUENCE_NUMBER头部(当标记是真的,标记被计为直线)。
线号可以与幂零接收机一起使用,以避免重启后线路的重处理。
例如:
@Bean
public ConcurrentMetadataStore store() {
return new ZookeeperMetadataStore();
}
@Bean
public MetadataStoreSelector selector() {
return new MetadataStoreSelector(
message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
.getAbsolutePath(),
message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
.toString(),
store())
.compareValues(
(oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(selector());
}
@Bean
public IntegrationFlow flow() {
...
.split(new FileSplitter())
...
.handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
...
}