通过消息启动批处理作业

使用核心的 Spring Batch API 启动批处理作业时,你 基本上有两个选择:spring-doc.cadn.net.cn

例如,你可能想使用命令行作业操作员调用批处理作业时 使用shell脚本。或者,你也可以使用作业操作员直接(例如,当使用 作为网页应用的一部分使用 Spring Batch)。但是,那又如何呢 更复杂的用例?也许你需要轮询一个远程(S)FTP 服务器以获取批处理作业或应用的数据 必须同时支持多个不同的数据源。为 例如,您可能不仅从网络接收数据文件,还可能来自 FTP和其他渠道。也许对输入文件进行额外的转换是 在调用春季批次之前,需要。spring-doc.cadn.net.cn

因此,执行批量作业会更强大 通过使用 Spring Integration 及其众多适配器。例如 你可以使用文件入站通道适配器 监控文件系统中的一个目录,并以 输入文件一到就要立足。此外,你还可以创建Spring 集成流程使用多个不同适配器,方便使用 从多个来源获取批处理作业的数据 同时仅使用配置即可实现。实现这些 使用 Spring 集成的场景很简单,因为它允许 解耦、事件驱动的执行作业操作员.spring-doc.cadn.net.cn

春季批处理集成提供了JobLaunchingMessageHandler你能 用来启动批处理作业。输入JobLaunchingMessageHandler由 Spring Integration 消息,其有效载荷类型为JobLaunchRequest.该类是包裹工作将发射并绕过作业参数即 启动批次作业所需的。spring-doc.cadn.net.cn

下图展示了典型的Spring积分 启动批处理作业所需的消息流。EIP(企业集成模式)网站提供了消息图标及其描述的完整概述。spring-doc.cadn.net.cn

启动批次作业
图1。启动批次作业

将文件转换为 JobLaunchRequest

以下示例将文件转换为JobLaunchRequest:spring-doc.cadn.net.cn

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;

import java.io.File;

public class FileMessageToJobRequest {
    private Job job;
    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
            message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}

JobExecution 响应

当批处理作业正在执行时,一个作业执行实例被返回。你可以用这个 实例以确定执行状态。如果 一个作业执行能够被创造出来 成功时,无论是否成功,它都会被返回 或者实际执行失败。spring-doc.cadn.net.cn

具体行为如何作业执行实例返回取决于所提供的任务执行者.如果同步(单螺纹)任务执行者实现时,使用作业执行仅返回任务完成。当使用异步 任务执行者作业执行实例返回 马上。然后你可以选择身份证作业执行实例 (其中JobExecution.getJobInstanceId())并查询JobRepository职位的最新状态 使用JobExplorer.更多内容 信息,请参见查询仓库spring-doc.cadn.net.cn

春季批次集成配置

考虑一个情况,有人需要创建一个文件入站信道适配器去倾听 对于提供目录中的CSV文件,交给变换器 (文件消息到工作请求),通过作业启动网关启动作业,并且 log 的输出作业执行其中日志通道适配器.spring-doc.cadn.net.cn

以下示例展示了如何在 Java 中配置这种常见情况:spring-doc.cadn.net.cn

Java 配置
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
    fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);

    return jobLaunchingGateway;
}

@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
    return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
                    filter(new SimplePatternFileListFilter("*.csv")),
            c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
            transform(fileMessageToJobRequest()).
            handle(jobLaunchingGateway).
            log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
            get();
}

以下示例展示了如何在XML中配置这种常见情况:spring-doc.cadn.net.cn

XML 配置
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>

<int-file:inbound-channel-adapter id="filePoller"
    channel="inboundFileChannel"
    directory="file:/tmp/myfiles/"
    filename-pattern="*.csv">
  <int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>

<int:transformer input-channel="inboundFileChannel"
    output-channel="outboundJobRequestChannel">
  <bean class="io.spring.sbi.FileMessageToJobRequest">
    <property name="job" ref="personJob"/>
    <property name="fileParameterName" value="input.file.name"/>
  </bean>
</int:transformer>

<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
    reply-channel="jobLaunchReplyChannel"/>

<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

示例 ItemReader 配置

既然我们已经轮询文件并启动作业,就需要配置我们的 Spring 批物品阅读器(例如)使用作业定义位置上的文件 参数称为“input.file.name”,如下豆状配置所示:spring-doc.cadn.net.cn

以下 Java 示例展示了所需的豆配置:spring-doc.cadn.net.cn

Java 配置
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
    FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
    flatFileItemReader.setResource(new FileSystemResource(resource));
...
    return flatFileItemReader;
}

以下XML示例展示了所需的豆子配置:spring-doc.cadn.net.cn

XML 配置
<bean id="itemReader" class="org.springframework.batch.infrastructure.item.file.FlatFileItemReader"
    scope="step">
  <property name="resource" value="file://#{jobParameters['input.file.name']}"/>
    ...
</bean>

前述示例的主要关注点是如何注入#{jobParameters['input.file.name']}作为资源属性值,并将物品阅读器豆 用步进镜。将豆子设置为步进范围可以利用 晚期绑定支持,允许访问jobParameters(工作参数)变量。spring-doc.cadn.net.cn