外部化批处理过程执行

迄今讨论的集成方法提出了应用场景 其中 Spring Integration 将 Spring Batch 包裹为外壳。 不过,Spring Batch 也可以在内部使用 Spring Integration。 通过这种方法,Spring Batch用户可以委派 将物品甚至块处理给外部进程。这 让你分担复杂的处理。春季批积分 为以下项目提供专门支持:spring-doc.cadn.net.cn

远程分块

下图展示了使用 Spring Batch 时远程分块的一种工作方式 结合春季集成:spring-doc.cadn.net.cn

远程分块
图1。远程分块

更进一步,你还可以将 通过使用区块消息频道项目写作(由 Spring Batch Integration 提供),发送项目 并收集结果。发送后,春季批次继续 阅读和分组项目的过程,无需等待结果。 相反,这是区块消息频道项目写作收集结果并重新整合进春季批次流程。spring-doc.cadn.net.cn

有了Spring Integration,你拥有完整的 对进程并发的控制(例如,由 使用队列通道而不是直达频道).此外,通过依赖 Spring Integration 丰富的通道适配器集合(例如 JMS 和 AMQP),你可以将批处理作业的块分发到 外部处理系统。spring-doc.cadn.net.cn

一个需要远程分块的步骤作业可能配置类似于 以下是爪哇语:spring-doc.cadn.net.cn

Java 配置
public Job chunkJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
     return new JobBuilder("personJob", jobRepository)
             .start(new StepBuilder("step1", jobRepository)
                     .<Person, Person>chunk(200, transactionManager)
                     .reader(itemReader())
                     .writer(itemWriter())
                     .build())
             .build();
 }

一个需要远程分块的步骤作业可能配置类似于 以下以XML形式:spring-doc.cadn.net.cn

XML 配置
<job id="personJob">
  <step id="step1">
    <tasklet>
      <chunk reader="itemReader" writer="itemWriter" commit-interval="200"/>
    </tasklet>
    ...
  </step>
</job>

物品阅读器你想用来读取数据的豆子的参考点 经理。这物品写手参考指向特殊物品写手(称为区块消息频道项目写作),如前所述。处理器(如果有的话)则被遗漏 管理器配置,就像在工作者上配置的那样。你应该检查任何 实现时还要添加额外的组件属性,如节气门限制等 你的使用场景。spring-doc.cadn.net.cn

以下 Java 配置提供了基本的管理器设置:spring-doc.cadn.net.cn

Java 配置
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://localhost:61616");
    return factory;
}

/*
 * Configure outbound flow (requests going to workers)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(requests())
            .handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
            .get();
}

/*
 * Configure inbound flow (replies coming from workers)
 */
@Bean
public QueueChannel replies() {
    return new QueueChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
            .channel(replies())
            .get();
}

/*
 * Configure the ChunkMessageChannelItemWriter
 */
@Bean
public ItemWriter<Integer> itemWriter() {
    MessagingTemplate messagingTemplate = new MessagingTemplate();
    messagingTemplate.setDefaultChannel(requests());
    messagingTemplate.setReceiveTimeout(2000);
    ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
            = new ChunkMessageChannelItemWriter<>();
    chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
    chunkMessageChannelItemWriter.setReplyChannel(replies());
    return chunkMessageChannelItemWriter;
}

以下 XML 配置提供了基本的管理器设置:spring-doc.cadn.net.cn

XML 配置
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<int-jms:outbound-channel-adapter id="jmsRequests" destination-name="requests"/>

<bean id="messagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate">
  <property name="defaultChannel" ref="requests"/>
  <property name="receiveTimeout" value="2000"/>
</bean>

<bean id="itemWriter"
    class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    scope="step">
  <property name="messagingOperations" ref="messagingTemplate"/>
  <property name="replyChannel" ref="replies"/>
</bean>

<int:channel id="replies">
  <int:queue/>
</int:channel>

<int-jms:message-driven-channel-adapter id="jmsReplies"
    destination-name="replies"
    channel="replies"/>

上述配置提供了若干豆子。我们 通过使用 ActiveMQ 配置我们的消息中间件和 由Spring Integration提供的进站和出站JMS适配器。如 显示,我们的itemWriter豆子,也就是 由我们的工作步骤引用,使用以下区块消息频道项目写作在 配置好的中间件。spring-doc.cadn.net.cn

现在我们可以进入工人配置,如下示例所示:spring-doc.cadn.net.cn

以下示例展示了Java中的工作者配置:spring-doc.cadn.net.cn

Java 配置
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://localhost:61616");
    return factory;
}

/*
 * Configure inbound flow (requests coming from the manager)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
            .channel(requests())
            .get();
}

/*
 * Configure outbound flow (replies going to the manager)
 */
@Bean
public DirectChannel replies() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(replies())
            .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
            .get();
}

/*
 * Configure the ChunkProcessorChunkHandler
 */
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
    ChunkProcessor<Integer> chunkProcessor
            = new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
    ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
            = new ChunkProcessorChunkHandler<>();
    chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
    return chunkProcessorChunkHandler;
}

以下示例展示了 XML 中的工作配置:spring-doc.cadn.net.cn

XML 配置
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<int:channel id="requests"/>
<int:channel id="replies"/>

<int-jms:message-driven-channel-adapter id="incomingRequests"
    destination-name="requests"
    channel="requests"/>

<int-jms:outbound-channel-adapter id="outgoingReplies"
    destination-name="replies"
    channel="replies">
</int-jms:outbound-channel-adapter>

<int:service-activator id="serviceActivator"
    input-channel="requests"
    output-channel="replies"
    ref="chunkProcessorChunkHandler"
    method="handleChunk"/>

<bean id="chunkProcessorChunkHandler"
    class="org.springframework.batch.integration.chunk.ChunkProcessorChunkRequestHandler">
  <property name="chunkProcessor">
    <bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
      <property name="itemWriter">
        <bean class="io.spring.sbi.PersonItemWriter"/>
      </property>
      <property name="itemProcessor">
        <bean class="io.spring.sbi.PersonItemProcessor"/>
      </property>
    </bean>
  </property>
</bean>

这些配置项目大多应该看起来很熟悉 管理器配置。工人不需要访问 春季批次JobRepository也不 到实际的作业配置文件。主要关注的Beans 是chunkProcessorChunkHandler.这块处理器的属性ChunkProcessorChunkRequestHandler需要一个 配置SimpleChunk处理器,这就是你会给你引用的地方物品写手(还有,可选的,你的物品处理器)该程序将运行在工人身上 当它从管理器那里收到块时。spring-doc.cadn.net.cn

更多信息请参见“可扩展性”章节中关于远程分块的部分。spring-doc.cadn.net.cn

从4.1版本开始,春季批分集成引入了@EnableBatchIntegration注释可以用来简化远程分块设置。该注释提供 有两个豆子可以在你的应用环境中自动接线:spring-doc.cadn.net.cn

这些API负责配置多个组件,如下图所示:spring-doc.cadn.net.cn

远程分块配置
图2。远程分块配置

在经理方面,RemoteChunkingManagerStepBuilderFactory让你 通过声明配置管理器步骤:spring-doc.cadn.net.cn

你不需要明确配置区块消息频道项目写作以及消息模板. (如果有理由,你仍然可以显式配置它们。)spring-doc.cadn.net.cn

在工人方面,远程分块工人构建器允许你配置工人:spring-doc.cadn.net.cn

你不必显式配置SimpleChunk处理器以及ChunkProcessorChunkRequestHandler.(如果你发现,仍然可以显式配置它们 有理由这么做)。spring-doc.cadn.net.cn

以下示例展示了如何使用这些 API:spring-doc.cadn.net.cn

@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {

    @Configuration
    public static class ManagerConfiguration {

        @Autowired
        private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;

        @Bean
        public TaskletStep managerStep() {
            return this.managerStepBuilderFactory.get("managerStep")
                       .chunk(100)
                       .reader(itemReader())
                       .outputChannel(requests()) // requests sent to workers
                       .inputChannel(replies())   // replies received from workers
                       .build();
        }

        // Middleware beans setup omitted

    }

    @Configuration
    public static class WorkerConfiguration {

        @Autowired
        private RemoteChunkingWorkerBuilder workerBuilder;

        @Bean
        public IntegrationFlow workerFlow() {
            return this.workerBuilder
                       .itemProcessor(itemProcessor())
                       .itemWriter(itemWriter())
                       .inputChannel(requests()) // requests received from the manager
                       .outputChannel(replies()) // replies sent to the manager
                       .build();
        }

        // Middleware beans setup omitted

    }

}

你可以在这里找到远程分块作业的完整示例。spring-doc.cadn.net.cn

远程分区

下图展示了典型的远程分区情况:spring-doc.cadn.net.cn

远程分区
图3。远程分区

另一方面,远程分区在 不是处理项目本身,而是相关的输入输出 造成瓶颈。通过远程分区,你可以发送工作 给完成完整春季批次的工人 步骤。因此,每个工人都有自己的物品阅读器,物品处理器物品写手.为此,春季批次 积分提供MessageChannelPartitionHandler.spring-doc.cadn.net.cn

这种实现分区处理接口用途消息频道实例到 向远程工作者发送指令并接收他们的回复。 这很好地抽象了传输(如JMS) 以及AMQP)用于与远程工作者沟通。spring-doc.cadn.net.cn

“可扩展性”章节中涉及远程分区的部分概述了这些概念和 配置远程分区所需的组件,并显示 使用默认的示例任务执行者分区处理器划分 在独立的本地执行线程中。用于远程分区 对于多个JVM,则需要两个额外的组件:spring-doc.cadn.net.cn

类似于远程分块,你可以用JMS作为“远程织物”。那就用 一个MessageChannelPartitionHandler实例分区处理实现 如前所述。spring-doc.cadn.net.cn

以下示例假设已有分区作业,重点关注MessageChannelPartitionHandler以及 Java 中的 JMS 配置:spring-doc.cadn.net.cn

Java 配置
/*
 * Configuration of the manager side
 */
@Bean
public PartitionHandler partitionHandler() {
    MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
    partitionHandler.setStepName("step1");
    partitionHandler.setGridSize(3);
    partitionHandler.setReplyChannel(outboundReplies());
    MessagingTemplate template = new MessagingTemplate();
    template.setDefaultChannel(outboundRequests());
    template.setReceiveTimeout(100000);
    partitionHandler.setMessagingOperations(template);
    return partitionHandler;
}

@Bean
public QueueChannel outboundReplies() {
    return new QueueChannel();
}

@Bean
public DirectChannel outboundRequests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsRequests() {
    return IntegrationFlow.from("outboundRequests")
            .handle(Jms.outboundGateway(connectionFactory())
                    .requestDestination("requestsQueue"))
            .get();
}

@Bean
@ServiceActivator(inputChannel = "inboundStaging")
public AggregatorFactoryBean partitioningMessageHandler() throws Exception {
    AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
    aggregatorFactoryBean.setProcessorBean(partitionHandler());
    aggregatorFactoryBean.setOutputChannel(outboundReplies());
    // configure other propeties of the aggregatorFactoryBean
    return aggregatorFactoryBean;
}

@Bean
public DirectChannel inboundStaging() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundJmsStaging() {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory())
                    .configureListenerContainer(c -> c.subscriptionDurable(false))
                    .destination("stagingQueue"))
            .channel(inboundStaging())
            .get();
}

/*
 * Configuration of the worker side
 */
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() {
    StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
    stepExecutionRequestHandler.setJobExplorer(jobExplorer);
    stepExecutionRequestHandler.setStepLocator(stepLocator());
    return stepExecutionRequestHandler;
}

@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator() throws Exception {
    return stepExecutionRequestHandler();
}

@Bean
public DirectChannel inboundRequests() {
    return new DirectChannel();
}

public IntegrationFlow inboundJmsRequests() {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory())
                    .configureListenerContainer(c -> c.subscriptionDurable(false))
                    .destination("requestsQueue"))
            .channel(inboundRequests())
            .get();
}

@Bean
public DirectChannel outboundStaging() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsStaging() {
    return IntegrationFlow.from("outboundStaging")
            .handle(Jms.outboundGateway(connectionFactory())
                    .requestDestination("stagingQueue"))
            .get();
}

以下示例假设已有分区作业,重点关注MessageChannelPartitionHandler以及 XML 中的 JMS 配置:spring-doc.cadn.net.cn

XML 配置
<bean id="partitionHandler"
   class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
  <property name="stepName" value="step1"/>
  <property name="gridSize" value="3"/>
  <property name="replyChannel" ref="outbound-replies"/>
  <property name="messagingOperations">
    <bean class="org.springframework.integration.core.MessagingTemplate">
      <property name="defaultChannel" ref="outbound-requests"/>
      <property name="receiveTimeout" value="100000"/>
    </bean>
  </property>
</bean>

<int:channel id="outbound-requests"/>
<int-jms:outbound-channel-adapter destination="requestsQueue"
    channel="outbound-requests"/>

<int:channel id="inbound-requests"/>
<int-jms:message-driven-channel-adapter destination="requestsQueue"
    channel="inbound-requests"/>

<bean id="stepExecutionRequestHandler"
    class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
  <property name="jobExplorer" ref="jobExplorer"/>
  <property name="stepLocator" ref="stepLocator"/>
</bean>

<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests"
    output-channel="outbound-staging"/>

<int:channel id="outbound-staging"/>
<int-jms:outbound-channel-adapter destination="stagingQueue"
    channel="outbound-staging"/>

<int:channel id="inbound-staging"/>
<int-jms:message-driven-channel-adapter destination="stagingQueue"
    channel="inbound-staging"/>

<int:aggregator ref="partitionHandler" input-channel="inbound-staging"
    output-channel="outbound-replies"/>

<int:channel id="outbound-replies">
  <int:queue/>
</int:channel>

<bean id="stepLocator"
    class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />

你还必须确保分区处理器属性映射到partitionHandler豆。spring-doc.cadn.net.cn

以下示例映射了该划分处理器归属于partitionHandler在 Java:spring-doc.cadn.net.cn

Java 配置
	public Job personJob(JobRepository jobRepository) {
		return new JobBuilder("personJob", jobRepository)
				.start(new StepBuilder("step1.manager", jobRepository)
						.partitioner("step1.worker", partitioner())
						.partitionHandler(partitionHandler())
						.build())
				.build();
	}

以下示例映射了该划分处理器归属于partitionHandler在 XML:spring-doc.cadn.net.cn

XML 配置
<job id="personJob">
  <step id="step1.manager">
    <partition partitioner="partitioner" handler="partitionHandler"/>
    ...
  </step>
</job>

你可以在这里找到远程分区作业的完整示例。spring-doc.cadn.net.cn

你可以使用@EnableBatchIntegration注释以简化远程作 分区设置。该注释提供了两种对远程分区有用的豆子:spring-doc.cadn.net.cn

这些API负责配置多个组件,如下图所示:spring-doc.cadn.net.cn

远程分区配置(含作业仓库轮询)
图4。远程分区配置(含作业仓库轮询)
远程分区配置(含回复聚合)
图5。远程分区配置(含回复聚合)

在经理方面,RemotePartitioningManagerStepBuilderFactory让你 通过声明配置管理器步骤:spring-doc.cadn.net.cn

你不需要显式配置MessageChannelPartitionHandler以及消息模板. (如果你有理由,仍然可以显式配置它们。)spring-doc.cadn.net.cn

在工人方面,RemotePartitioningWorkerStepBuilderFactory允许你配置工人:spring-doc.cadn.net.cn

你不必显式配置StepExecutionRequestHandler. (如果你有理由,可以明确配置它。)spring-doc.cadn.net.cn

以下示例展示了如何使用这些 API:spring-doc.cadn.net.cn

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {

    @Configuration
    public static class ManagerConfiguration {

        @Autowired
        private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;

        @Bean
        public Step managerStep() {
                 return this.managerStepBuilderFactory
                    .get("managerStep")
                    .partitioner("workerStep", partitioner())
                    .gridSize(10)
                    .outputChannel(outgoingRequestsToWorkers())
                    .inputChannel(incomingRepliesFromWorkers())
                    .build();
        }

        // Middleware beans setup omitted

    }

    @Configuration
    public static class WorkerConfiguration {

        @Autowired
        private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;

        @Bean
        public Step workerStep() {
                 return this.workerStepBuilderFactory
                    .get("workerStep")
                    .inputChannel(incomingRequestsFromManager())
                    .outputChannel(outgoingRepliesToManager())
                    .chunk(100)
                    .reader(itemReader())
                    .processor(itemProcessor())
                    .writer(itemWriter())
                    .build();
        }

        // Middleware beans setup omitted

    }

}