|
该版本仍在开发中,尚未被视为稳定。对于最新的稳定版本,请使用 Spring Integration 7.0.0! |
散布-聚集
功能性
这散布-聚集模式暗示两种情景:“拍卖”和“分配”。
在这两种情况下,集合体功能相同,提供所有可用的选项聚合消息处理器.
(实际上,散布收集处理器只需一个聚合消息处理器作为构造子论证。)
更多信息请参见聚合器。
拍卖
拍卖散布-聚集变体使用“发布-订阅”逻辑处理请求消息,其中“散射”信道为发布订阅频道跟apply-sequence=“true”.
然而,这个通道可以是任意的消息频道实现(如同请求信道在内容丰富——参见内容丰富剂)。
不过,在这种情况下,你应该自己创建一个自定义相关性策略对于集合体功能。
分配
分布散布-聚集变体基于收件人列表路由器(参见收件人列表路由器)并包含所有可用的选项收件人列表路由器.
这是第二个散布收集处理器构造者论证。
如果你只想依赖默认相关性策略对于收件人列表路由器以及聚合,你应该具体说明apply-sequence=“true”.
否则,你应该提供定制服务相关性策略对于聚合.
与发布订阅频道变体(拍卖变体),具有收件人列表路由器 选择器选项可以根据消息筛选目标提供商。
跟apply-sequence=“true”,默认序列大小是提供的,并且聚合能够正确释放该组。
分发选项与拍卖选项是互斥的。
这applySequence=true仅用于基于ScatterGatherHandler(MessageHandler散布器,MessageHandler收集器)构造器配置,因为框架不能变异外部提供的组件。
为方便起见,XML 和 Java DSL 为散布-聚集集应用序列从6.0版本开始,真理。 |
对于拍卖和分发两种变体,请求(散布)消息都被丰富了gatherResultChannel以 头部等待来自聚合.
默认情况下,所有提供商都应将结果发送给回复频道头部(通常通过省略输出通道从终极终点开始)。
然而,gatherChannel还提供一个选项,允许提供商将回复发送到该渠道进行聚合。
配置散点收集端点
以下示例展示了 bean 定义的 Java 配置散布-聚集:
@Bean
public MessageHandler distributor() {
RecipientListRouter router = new RecipientListRouter();
router.setApplySequence(true);
router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
distributionChannel3()));
return router;
}
@Bean
public MessageHandler gatherer() {
return new AggregatingMessageHandler(
new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
new SimpleMessageStore(),
new HeaderAttributeCorrelationStrategy(
IntegrationMessageHeaderAccessor.CORRELATION_ID),
new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}
@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
handler.setOutputChannel(output());
return handler;
}
在前面的例子中,我们配置了收件人列表路由器 分配器豆子applySequence=“true”以及接收频道列表。
下一颗豆子是给聚合消息处理器.
最后,我们把这两颗豆子都注射进散布收集处理器豆子定义并标记为@ServiceActivator将散收集组件接入积分流。
以下示例展示了如何配置<散开聚集的声音>通过使用 XML 命名空间实现端点:
<scatter-gather
id="" (1)
auto-startup="" (2)
input-channel="" (3)
output-channel="" (4)
scatter-channel="" (5)
gather-channel="" (6)
order="" (7)
phase="" (8)
send-timeout="" (9)
gather-timeout="" (10)
requires-reply="" > (11)
<scatterer/> (12)
<gatherer/> (13)
</scatter-gather>
| 1 | 端点的ID。
这散布收集处理器Bean的别名为id + '.handler'.
这收件人列表路由器Bean的别名为id + '.scatterer'.
这聚合消息处理器Bean的别名为id + '.gatherer'.
自选。
(豆子工厂生成违约身份证价值。) |
| 2 | 生命周期属性指示终端是否应在应用上下文初始化时启动。
此外,散布收集处理器其他实现生命周期开始又停止gatherEndpoint,如果 a ,则内部生成收集通道提供。
自选。
(默认为true.) |
| 3 | 接收请求消息的通道,以处理这些请求散布收集处理器.
必填。 |
| 4 | 所指向的通道散布收集处理器发送聚合结果。
自选。
(收到的消息可以在回复频道消息头部)。 |
| 5 | 拍卖场景中散点消息的发送渠道。
自选。
与<散布者>子元素。 |
| 6 | 接收来自每个提供商回复的渠道。
它被用作回复频道散点消息中的头部。
自选。
默认情况下,固定订阅频道被创造出来。 |
| 7 | 当多个处理器订阅同一组件时,该组件的顺序直达频道(用于负载均衡目的)。
自选。 |
| 8 | 指定端点应在哪个阶段开始和停止。
启动订单从低到高依序,关闭订单从高到低。
默认情况下,这个值为Integer.MAX价值,意味着该容器尽可能晚开始,尽快停止。
自选。 |
| 9 | 发送回复时等待的超时间隔消息前往输出通道.
默认情况下,发送()阻挡了一秒钟。
只有当输出通道存在某些“发送”限制时,它才适用——例如,队列通道拥有固定的“容量”,且容量是满的。
在这种情况下,一个MessageDeliveryException被抛出。
这发送超时忽略了摘要订阅频道实现。
为group-timeout(-表达式)这MessageDeliveryException从已排定的任务中,该任务被重新安排。
自选。 |
| 10 | 它允许你指定散布收集等待回复消息的时间,然后返回。
默认情况下,它会等待30秒。
如果回复超时,则返回“null”。
自选。 |
| 11 | 指定散点采集是否必须返回非空值。
该值为true默认。
因此,一个回复必需例外当底层聚合器在 之后返回空值时,会被抛出集合超时.
注意,如果零是一种可能性,且集合超时应明确说明以避免无限期等待。 |
| 12 | 这<收件人列表路由器>选项。
自选。
互斥的散射通道属性。 |
| 13 | 这<聚合器>选项。
必填。 |
错误处理
由于散点收集是多请求-回复组件,错误处理增加了一些复杂性。
在某些情况下,如果发布策略这样流程结束时回复数量少于请求数。
在其他情况下,当发生错误时,应考虑发送类似“补偿消息”的返回。
每个异步子流都应配置为errorChannel用于发送正确错误消息的头部消息发布错误处理.
否则,错误将发送给全局errorChannel采用常见的错误处理逻辑。
有关异步错误处理的更多信息,请参见错误处理。
同步流可能使用ExpressionEvaluatingRequestHandlerAdvice因为忽视异常或返回补偿消息。
当某个子流抛出异常时散布收集处理器,它只是被重新抛向上游。
这样其他子流就不会无效,他们的回复也会被忽略散布收集处理器.
这有时可能是预期行为,但在大多数情况下,处理特定子流的错误时不影响其他所有子流和采集器中的期望会更好。
从5.1.3版本开始,散布收集处理器配备errorChannelName选择。
其人口分布为errorChannel散布消息的头部,用于发生异步错误,或在常规同步子流中直接发送错误消息。
下面的示例配置通过返回补偿消息演示了异步错误处理:
@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
return f -> f
.scatterGather(
scatterer -> scatterer
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
.transform(p -> {
throw new RuntimeException("Sub-flow#2");
})),
null,
s -> s.errorChannel("scatterGatherErrorChannel"));
}
@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
return MessageBuilder.withPayload(payload.getCause().getCause())
.copyHeaders(payload.getFailedMessage().getHeaders())
.build();
}
为了产生正确的回复,我们必须复制头部(包括回复频道和errorChannel)失败消息关于消息异常该文件已发送至散点收集错误频道由消息发布错误处理.
这样,目标异常会返回给散布收集处理器回复消息组完成。
真是个例外有效载荷可以在消息组处理器收集者或在散收集端点之后的下游处理。
在将散射结果发送给采集者之前,散布收集处理器恢复请求消息头部,包括回复信道和错误信道(如有)。
这样,错误来自聚合消息处理器即使在散布接收者子流中应用异步切换,也会传播给呼叫者。
为了成功作,agatherResultChannel,原始回复频道和原版ErrorChannel(原版ErrorChannel)头部必须从散布接收者子流中返回回复。
在这种情况下,一个合理的有限gatherTimeout必须配置为散布收集处理器.
否则,默认情况下会被永久屏蔽,等待采集者的回复。 |