|
对于最新稳定版本,请使用Spring Batch Documentation 6.0.0! |
通过消息启动批处理作业
使用核心的 Spring Batch API 启动批处理作业时,基本上有两个选择:
-
从命令行中,使用
命令行作业跑者 -
在程序化方面,任一
JobOperator.start()或JobLauncher.run()
例如,你可能想使用命令行作业跑者调用批处理作业时,通过使用shell脚本。或者,你也可以使用作业操作员直接(例如,在使用Spring Batch 作为网页应用的一部分时)。但是,如果更复杂的用例呢?也许你需要轮询远程(S)FTP服务器以获取批处理作业的数据,或者你的应用程序必须同时支持多个不同的数据源。 为 例如,你可能不仅会从网络接收数据文件,还会从FTP 和其他来源接收数据文件。也许输入文件需要额外的转换然后才调用 Spring Batch。
因此,通过使用 Spring Integration 及其众多适配器执行批处理任务会更强大。使用 Spring Integration 及其众多适配器。 例如 你可以使用文件入站通道适配器来监控文件系统中的一个目录,并在输入文件到达时立即启动批处理作业。此外,您还可以创建 Spring集成流程,使用多个不同的适配器轻松地从多个来源获取批处理作业的数据仅通过配置即可同时完成。实现所有这些场景与Spring Integration非常简单,因为它允许解耦、事件驱动的执行JobLauncher.
春季批处理集成提供了JobLaunchingMessageHandler你可以用来启动批处理作业的类。输入JobLaunchingMessageHandler由Spring Integration 消息提供,其有效载荷类型为JobLaunchRequest. 该类是包裹工作将发射并绕过作业参数这些是启动批处理作业所必需的。
下图展示了典型的 Spring 集成启动批处理作业所需的消息流。EIP(企业集成模式)网站提供了消息图标及其描述的完整概述。
将文件转换为 JobLaunchRequest
以下示例将文件转换为JobLaunchRequest:
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;
import java.io.File;
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
public void setJob(Job job) {
this.job = job;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder =
new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName,
message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}
JobExecution 响应
当批处理作业正在执行时,一个作业执行实例被返回。你可以利用这个实例来确定执行状态。 如果 一个作业执行能够被创建成功后,无论实际执行是否成功,它总是被返回。实际执行是否成功。
具体行为如何作业执行实例返回取决于所提供的任务执行者. 如果同步(单螺纹)任务执行者实现时,使用作业执行仅返回后工作完成。当使用异步
任务执行者这作业执行实例返回 马上。 然后你可以拿身份证之作业执行实例 (其中JobExecution.getJobId())并查询JobRepository关于该职位的更新状态使用JobExplorer. 欲了解更多信息请参见查询仓库。
春季批次集成配置
考虑一个情况,有人需要创建一个文件入站信道适配器监听对于提供目录中的CSV文件,将其交给变换器 (文件消息到工作请求),通过作业启动网关启动作业,并且记录 的输出作业执行其中日志通道适配器.
-
Java
-
XML
以下示例展示了如何在 Java 中配置这种常见情况:
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(personJob());
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
return jobLaunchingGateway;
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
filter(new SimplePatternFileListFilter("*.csv")),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
transform(fileMessageToJobRequest()).
handle(jobLaunchingGateway).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}
以下示例展示了如何在XML中配置这种常见情况:
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<int-file:inbound-channel-adapter id="filePoller"
channel="inboundFileChannel"
directory="file:/tmp/myfiles/"
filename-pattern="*.csv">
<int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="io.spring.sbi.FileMessageToJobRequest">
<property name="job" ref="personJob"/>
<property name="fileParameterName" value="input.file.name"/>
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel"/>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
示例 ItemReader 配置
既然我们已经轮询文件并启动作业,就需要配置我们的 Spring 批物品阅读器(例如)使用作业定义的位置文件参数称为“input.file.name”,如下 bean 配置所示:
-
Java
-
XML
以下 Java 示例展示了所需的豆配置:
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
flatFileItemReader.setResource(new FileSystemResource(resource));
...
return flatFileItemReader;
}
以下XML示例展示了所需的豆子配置:
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
scope="step">
<property name="resource" value="file://#{jobParameters['input.file.name']}"/>
...
</bean>
前述示例的主要关注点是如何注入#{jobParameters['input.file.name']}作为资源属性值,并将物品阅读器豆 拥有步进作用域。设置 Bean 具有步进作用域利用了晚期绑定支持,允许访问jobParameters(工作参数)变量。