消息路由
消息路由
本章详细介绍如何使用 Spring Integration 路由消息。
路由器
本节介绍路由器的工作原理。 课程涵盖以下主题:
概述
路由器是许多消息架构中的关键元素。 它们从一个消息通道中接收消息,并根据条件将每个被消耗的消息转发到一个或多个不同的消息通道。
Spring Integration 提供以下路由器:
路由器实现共享许多配置参数。 然而,不同路由器之间存在某些差异。 此外,配置参数的可用性取决于路由器是使用链内还是链外。 为了快速概述,所有可用属性列在以下两张表中。
下表展示了链外路由器可用的配置参数:
| 属性 | 路由器 | 头值布线器 | XPATH 路由器 | 有效载荷型布线器 | 收件人名单路线 | 例外类型路由器 |
|---|---|---|---|---|---|---|
应用序列 |
|
|
|
|
|
|
默认输出通道 |
|
|
|
|
|
|
分辨率要求 |
|
|
|
|
|
|
忽略-发送-失败 |
|
|
|
|
|
|
超时 |
|
|
|
|
|
|
身份证 |
|
|
|
|
|
|
自动启动 |
|
|
|
|
|
|
输入通道 |
|
|
|
|
|
|
次序 |
|
|
|
|
|
|
方法 |
|
|||||
裁判 |
|
|||||
表达 |
|
|||||
首部名称 |
|
|||||
作为字符串的计算 |
|
|||||
xpath-expression-ref |
|
|||||
转炉 |
|
下表展示了链内路由器可用的配置参数:
| 属性 | 路由器 | 头值布线器 | XPATH 路由器 | 有效载荷型布线器 | 收件人列表路由器 | 例外类型路由器 |
|---|---|---|---|---|---|---|
应用序列 |
|
|
|
|
|
|
默认输出通道 |
|
|
|
|
|
|
分辨率要求 |
|
|
|
|
|
|
忽略-发送-失败 |
|
|
|
|
|
|
超时 |
|
|
|
|
|
|
身份证 |
||||||
自动启动 |
||||||
输入通道 |
||||||
次序 |
||||||
方法 |
|
|||||
裁判 |
|
|||||
表达 |
|
|||||
首部名称 |
|
|||||
作为字符串的计算 |
|
|||||
xpath-expression-ref |
|
|||||
转炉 |
|
|
自 Spring Integration 2.1 起,路由器参数在所有路由器实现中更加标准化。 因此,一些小改动可能会破坏基于Spring Integration的旧应用。 自 Spring Integration 2.1 以来, 在这些变化之前, 如果你想静音发送消息,可以设置 |
常见的路由器参数
本节描述了所有路由器参数的共同参数(即本章前两张表格中所有选项均勾选的参数)。
链内与链外
以下参数适用于链内外的所有路由器。
应用序列-
该属性指定是否应为每个消息添加序列号和大小头部。 该可选属性默认为
false. 默认输出通道-
如果设置为该属性,则在通道解析未能返回任何通道时,该通道应发送消息。 如果没有提供默认输出通道,路由器会抛出异常。 如果你想静默地丢弃这些消息,可以将默认输出通道属性值设置为
零通道.从6.0版本开始,设置默认输出通道也会重置 channelKeyFallback选项false. 因此,不会尝试从通道名称中解析通道,而是退回到这个默认输出通道——类似于 Java开关陈述。 如果channelKeyFallback设置为true具体地说,后续逻辑依赖于需要分辨率选项:密钥向未解析信道发送消息可以到达默认输出通道仅当需要分辨率是false. 因此,有一个配置默认输出通道提供且channelKeyFallback&需要分辨率设置为true被 拒绝摘要地图消息路由器初始化阶段。 分辨率要求-
该属性指定通道名称是否必须始终成功解析为存在的通道实例。 如果设置为
true一个消息异常当通道无法解析时,会被提升。 将该属性设置为false导致任何无法解析的通道被忽略。 该可选属性默认为true.消息只发送给 默认输出通道,如果指定,当分辨率要求是false通道未解析。 忽略-发送-失败-
如果设置为
true未发送到消息通道的失败将被忽略。 如果设置为false一个MessageDeliveryException被抛出,如果路由器解析多个信道,后续信道将无法接收该消息。该属性的具体行为取决于
渠道这些信息就是被发送到那里的。 例如,使用直达通道(单线程)时,发送失败可能由更下游组件抛出的异常引起。 然而,当向简单的队列通道发送消息(异步)时,异常被抛出的可能性相当小。虽然大多数路由器只路由到一个信道,但它们可以返回多个信道名称。 这 收件人列表路由器例如,,正是如此。 如果你将这个属性设置为true对于只路由到单信道的路由器,任何引发的异常都会被吞并,这通常没什么意义。 在这种情况下,最好在流入口点发现错误流中的异常。 因此,设忽略-发送-失败归属为true当路由器实现返回多个信道名称时,通常更合理,因为失败信道之后的其他信道仍能接收消息。该属性默认为
false. 超时-
这
超时attribute 指定向目标消息通道发送消息时等待的最大时间(毫秒)。 默认情况下,发送作会无限期阻塞。
路由器实现
由于基于内容的路由通常需要某些域特定的逻辑,大多数用例都需要 Spring Integration 的选项,通过 XML 命名空间支持或注释来委派给 POJO。 这两点后文都会讨论。 不过,我们首先介绍几个满足共同需求的实现。
有效载荷类型路由器
一个有效载荷类型路由器向由有效载荷类型映射定义的信道发送消息,如下示例所示:
<bean id="payloadTypeRouter"
class="org.springframework.integration.router.PayloadTypeRouter">
<property name="channelMapping">
<map>
<entry key="java.lang.String" value-ref="stringChannel"/>
<entry key="java.lang.Integer" value-ref="integerChannel"/>
</map>
</property>
</bean>
配置有效载荷类型路由器也由 Spring Integration 提供的命名空间支持命名空间支持),本质上通过将<路由器/>配置及其对应实现(通过使用<豆/>元素)变成一个单一且更简洁的配置元素。以下示例展示了一个有效载荷类型路由器配置等价于上述配置,但使用命名空间支持:
<int:payload-type-router input-channel="routingChannel">
<int:mapping type="java.lang.String" channel="stringChannel" />
<int:mapping type="java.lang.Integer" channel="integerChannel" />
</int:payload-type-router>
以下示例展示了用 Java 配置的对应路由器:
@ServiceActivator(inputChannel = "routingChannel")
@Bean
public PayloadTypeRouter router() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(String.class.getName(), "stringChannel");
router.setChannelMapping(Integer.class.getName(), "integerChannel");
return router;
}
使用 Java DSL 时,有两种选择。
首先,你可以像前面示例所示定义路由器对象:
@Bean
public IntegrationFlow routerFlow1() {
return IntegrationFlow.from("routingChannel")
.route(router())
.get();
}
public PayloadTypeRouter router() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(String.class.getName(), "stringChannel");
router.setChannelMapping(Integer.class.getName(), "integerChannel");
return router;
}
注意路由器可以是,但不一定是@Bean. 如果流不是@Bean.
其次,你可以在DSL流程本身中定义路由功能,如下示例所示:
@Bean
public IntegrationFlow routerFlow2() {
return IntegrationFlow.from("routingChannel")
.<Object, Class<?>>route(Object::getClass, m -> m
.channelMapping(String.class, "stringChannel")
.channelMapping(Integer.class, "integerChannel"))
.get();
}
头值路由器
一个头值路由器根据各个头部值映射向信道发送消息。当头值路由器创建时,它被初始化为待评估的头部名称。头部的值可以是以下两种形式之一:
-
一个任意的数值
-
频道名称
如果是任意值,则需要将这些头值映射到信道名称。否则,无需额外配置。
Spring Integration 提供了一个简单的基于命名空间的 XML 配置,用于配置头值路由器. 以下示例展示了头值路由器当需要将头部值映射到信道时:
<int:header-value-router input-channel="routingChannel" header-name="testHeader">
<int:mapping value="someHeaderValue" channel="channelA" />
<int:mapping value="someOtherHeaderValue" channel="channelB" />
</int:header-value-router>
在解析过程中,前述示例中定义的路由器可能会遇到通道解析失败,导致异常。如果你想抑制此类异常,并将未解析的消息发送到默认输出通道(标识为默认输出通道属性)集合分辨率要求自false.
通常,头值未被显式映射到信道的消息会发送到默认输出通道. 然而,当头部值映射到信道名称但无法解析信道时,可以设置分辨率要求归属为false结果是将此类消息路由到默认输出通道.
自 Spring Integration 2.1 起,属性从忽略通道名称解析失败自分辨率要求. 属性分辨率要求默认为true. |
以下示例展示了用 Java 配置的对应路由器:
@ServiceActivator(inputChannel = "routingChannel")
@Bean
public HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter("testHeader");
router.setChannelMapping("someHeaderValue", "channelA");
router.setChannelMapping("someOtherHeaderValue", "channelB");
return router;
}
使用 Java DSL 时,有两种选择。首先,你可以像前面示例所示定义路由器对象:
@Bean
public IntegrationFlow routerFlow1() {
return IntegrationFlow.from("routingChannel")
.route(router())
.get();
}
public HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter("testHeader");
router.setChannelMapping("someHeaderValue", "channelA");
router.setChannelMapping("someOtherHeaderValue", "channelB");
return router;
}
注意路由器可以是,但不一定是@Bean. 如果流不是@Bean.
其次,你可以在DSL流程本身中定义路由功能,如下示例所示:
@Bean
public IntegrationFlow routerFlow2() {
return IntegrationFlow.from("routingChannel")
.route(Message.class, m -> m.getHeaders().get("testHeader", String.class),
m -> m
.channelMapping("someHeaderValue", "channelA")
.channelMapping("someOtherHeaderValue", "channelB"),
e -> e.id("headerValueRouter"))
.get();
}
配置中不需要将头值映射到信道名称,因为头部值本身代表信道名称。以下示例展示了一台路由器不需要将头值映射到信道名:
<int:header-value-router input-channel="routingChannel" header-name="testHeader"/>
|
自 Spring Integration 2.1 以来,解析通道的行为变得更加明确。例如,如果你省略了 基本上,默认情况下,路由器必须能够成功将消息路由到至少一个信道。如果你真的想丢弃消息,你还必须有 |
收件人列表路由器
一个收件人列表路由器将每个收到的消息发送到一个静态定义的消息通道列表。以下示例创建了收件人列表路由器:
<bean id="recipientListRouter"
class="org.springframework.integration.router.RecipientListRouter">
<property name="channels">
<list>
<ref bean="channel1"/>
<ref bean="channel2"/>
<ref bean="channel3"/>
</list>
</property>
</bean>
Spring 集成还为收件人列表路由器配置(参见命名空间支持),如下示例所示:
<int:recipient-list-router id="customRouter" input-channel="routingChannel"
timeout="1234"
ignore-send-failures="true"
apply-sequence="true">
<int:recipient channel="channel1"/>
<int:recipient channel="channel2"/>
</int:recipient-list-router>
以下示例展示了用 Java 配置的对应路由器:
@ServiceActivator(inputChannel = "routingChannel")
@Bean
public RecipientListRouter router() {
RecipientListRouter router = new RecipientListRouter();
router.setSendTimeout(1_234L);
router.setIgnoreSendFailures(true);
router.setApplySequence(true);
router.addRecipient("channel1");
router.addRecipient("channel2");
router.addRecipient("channel3");
return router;
}
以下示例展示了使用Java DSL配置的等效路由器:
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("routingChannel")
.routeToRecipients(r -> r
.applySequence(true)
.ignoreSendFailures(true)
.recipient("channel1")
.recipient("channel2")
.recipient("channel3")
.sendTimeout(1_234L))
.get();
}
这里的“应用序列”标志与发布-订阅频道的效果相同,且与发布-订阅频道一样,默认情况下是收件人列表路由器. 看发布订阅频道配置更多信息请见。 |
配置时另一个方便的选项收件人列表路由器是将Spring表达式语言(SpEL)支持作为单个接收信道的选择器。这样做类似于在“链”开头使用Filter作为“选择性消费者”。然而,在这种情况下,这些内容被相当简洁地结合到路由器的配置中,如下示例所示:
<int:recipient-list-router id="customRouter" input-channel="routingChannel">
<int:recipient channel="channel1" selector-expression="payload.equals('foo')"/>
<int:recipient channel="channel2" selector-expression="headers.containsKey('bar')"/>
</int:recipient-list-router>
在前述配置中,由选择表达式属性会被评估,以确定该收件人是否应包含在给定输入消息的收件人列表中。
表达式的评估结果必须是布尔.
如果该属性未定义,该信道总是在接收者列表中。
收件人列表路由器管理
从4.1版本开始,收件人列表路由器提供多种作,在运行时动态作接收者。
这些管理作由以下方式呈现收件人列表路由器管理通过@ManagedResource注解。
它们可以通过控制总线和JMX两种方式获得,如下示例所示:
<control-bus input-channel="controlBus"/>
<recipient-list-router id="simpleRouter" input-channel="routingChannelA">
<recipient channel="channel1"/>
</recipient-list-router>
<channel id="channel2"/>
messagingTemplate.convertAndSend(controlBus, "@'simpleRouter.handler'.addRecipient('channel2')");
从申请开始simpleRouter只有一个频道1收件人。
但在addRecipient命令频道2收件人被添加。
这是一种“注册对消息中某项内容感兴趣”的用例,当我们可能在某个时间点对路由器发送的消息感兴趣时,我们订阅了收件人列表路由器然后,在某个时候,决定退订。
由于运行时管理作的关系<收件人列表路由器>,它可以在没有任何情况下进行配置<收件人>从一开始。
在这种情况下,的行为收件人列表路由器当消息没有匹配的接收者时,也是相同的。
如果默认输出通道配置完成后,消息被发送到那里。
否则MessageDeliveryException被抛出。
XPath 路由器
XPath 路由器是 XML 模块的一部分。 参见 XPath 中的 XML 消息路由。
路由与错误处理
Spring Integration 还提供一种特殊的基于类型的路由器,称为错误消息异常类型路由器用于路由错误消息(定义为有效载荷是可投掷实例)。错误消息异常类型路由器与有效载荷类型路由器.
事实上,它们几乎一模一样。
唯一的区别是,虽然有效载荷类型路由器导航有效载荷实例的实例层级(例如,payload.getClass().getSuperclass())以找到最具体的类型和信道映射,即错误消息异常类型路由器在“异常原因”层级中导航(例如,payload.getCause())以找到最具体的可投掷类型或信道映射及用途mappingClass.isInstance(原因)以匹配原因无论是班级还是任何超级班级。
在这种情况下,通道映射顺序很重要。
所以,如果需要获取映射以获取IllegalArgumentException,但不是运行异常最后一个必须先在路由器上配置。 |
自4.3版本起错误消息异常类型路由器在初始化阶段加载所有映射类,使其实现故障快速ClassNotFoundException. |
下例展示了 的示例配置错误消息异常类型路由器:
<int:exception-type-router input-channel="inputChannel"
default-output-channel="defaultChannel">
<int:mapping exception-type="java.lang.IllegalArgumentException"
channel="illegalChannel"/>
<int:mapping exception-type="java.lang.NullPointerException"
channel="npeChannel"/>
</int:exception-type-router>
<int:channel id="illegalChannel" />
<int:channel id="npeChannel" />
配置通用路由器
Spring Integration 提供了一个通用的路由器。 你可以用它做通用路由(不同于 Spring Integration 提供的其他路由器,后者都有某种专业化)。
使用 XML 配置基于内容的路由器
这路由器element 提供了将路由器连接到输入通道的方法,也接受可选的默认输出通道属性。
这裁判属性引用自定义路由器实现的 BEAN 名称(该实现必须扩展摘要消息路由器).
以下示例展示了三种通用路由器:
<int:router ref="payloadTypeRouter" input-channel="input1"
default-output-channel="defaultOutput1"/>
<int:router ref="recipientListRouter" input-channel="input2"
default-output-channel="defaultOutput2"/>
<int:router ref="customRouter" input-channel="input3"
default-output-channel="defaultOutput3"/>
<beans:bean id="customRouterBean" class="org.foo.MyCustomRouter"/>
或者裁判可能指向包含@Router注释(后文显示),或者你可以结合裁判并带有显式方法名称。
指定方法时,应用与以下描述的行为相同@Router注释部分,见本文档后面。
以下示例定义了一个指向其 POJO 的路由器裁判属性:
<int:router input-channel="input" ref="somePojo" method="someMethod"/>
我们通常建议使用裁判如果自定义路由器实现在其他中被引用,属性<路由器>定义。
然而,如果自定义路由器实现应将作用域限定为单一定义<路由器>你可以给出一个内部的“豆”定义,如下示例所示:
<int:router method="someMethod" input-channel="input3"
default-output-channel="defaultOutput3">
<beans:bean class="org.foo.MyCustomRouter"/>
</int:router>
同时使用裁判属性和内部处理程序定义在同一个<路由器>不允许配置。
这样做会产生歧义条件并抛出异常。 |
如果裁判属性指代扩展的豆子摘要消息制作处理程序(例如框架本身提供的路由器),配置优化为直接引用路由器。
在这种情况下,每一个裁判属性必须指向一个独立的豆实例(或原型-有瞄准镜的豆子)或使用内层<豆/>配置类型。
然而,这种优化仅适用于路由器XML定义中未提供任何路由器特有属性。
如果你无意中引用了多个 Beans 中的同一个消息处理程序,就会出现配置异常。 |
以下示例展示了用 Java 配置的对应路由器:
@Bean
@Router(inputChannel = "routingChannel")
public AbstractMessageRouter myCustomRouter() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
return // determine channel(s) for message
}
};
}
以下示例展示了使用Java DSL配置的等效路由器:
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("routingChannel")
.route(myCustomRouter())
.get();
}
public AbstractMessageRouter myCustomRouter() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
return // determine channel(s) for message
}
};
}
或者,你也可以根据消息有效载荷的数据进行路由,如下示例所示:
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("routingChannel")
.route(String.class, p -> p.contains("foo") ? "fooChannel" : "barChannel")
.get();
}
路由器与Spring表达式语言(SpEL)
有时,路由逻辑可能很简单,为它写一个独立类并配置成豆子可能显得有些大材小用。 自 Spring Integration 2.0 起,我们提供了一种替代方案,允许你使用 SpEL 实现以前需要自定义 POJO 路由器的简单计算。
| 有关 Spring 表达式语言的更多信息,请参见 Spring 框架参考指南中的相关章节。 |
通常,会评估一个SpEL表达式,并将结果映射到信道,如下示例所示:
<int:router input-channel="inChannel" expression="payload.paymentType">
<int:mapping value="CASH" channel="cashPaymentChannel"/>
<int:mapping value="CREDIT" channel="authorizePaymentChannel"/>
<int:mapping value="DEBIT" channel="authorizePaymentChannel"/>
</int:router>
以下示例展示了用 Java 配置的对应路由器:
@Router(inputChannel = "routingChannel")
@Bean
public ExpressionEvaluatingRouter router() {
ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter("payload.paymentType");
router.setChannelMapping("CASH", "cashPaymentChannel");
router.setChannelMapping("CREDIT", "authorizePaymentChannel");
router.setChannelMapping("DEBIT", "authorizePaymentChannel");
return router;
}
以下示例展示了Java DSL中配置的等效路由器:
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("routingChannel")
.route("payload.paymentType", r -> r
.channelMapping("CASH", "cashPaymentChannel")
.channelMapping("CREDIT", "authorizePaymentChannel")
.channelMapping("DEBIT", "authorizePaymentChannel"))
.get();
}
为了更简化,SpEL表达式可以计算为通道名称,如下表达式所示:
<int:router input-channel="inChannel" expression="payload + 'Channel'"/>
在前述配置中,结果通道通过SpEL表达式计算,该表达式将有效载荷字面意义上的字符串, “频道”。
SpEL在配置路由器时的另一个优点是表达式可以返回收集,实际上使<路由器>一个接收者列表路由器。
每当表达式返回多个信道值时,消息会转发到每个信道。
以下示例展示了这样的表达式:
<int:router input-channel="inChannel" expression="headers.channels"/>
在上述配置中,如果消息包含一个名为“通道”的头部,且该头部的值为列表信道名称中,消息发送到列表中的每个信道。
当你需要选择多个通道时,你可能会发现收藏投影和收藏选择表达式很有用。
更多信息请参见:
带注释配置路由器
使用@Router要注释方法,该方法可以返回消息频道或者字符串类型。
在后一种情况下,端点会像解析默认输出信道一样解析信道名称。
此外,该方法可以返回单个值或集合。
如果回收了集合,回复消息会发送到多个通道。
总结来说,以下方法签名均有效:
@Router
public MessageChannel route(Message message) {...}
@Router
public List<MessageChannel> route(Message message) {...}
@Router
public String route(Foo payload) {...}
@Router
public List<String> route(Foo payload) {...}
除了基于有效载荷的路由外,消息还可以根据消息头部中作为属性或属性提供的元数据进行路由。
在这种情况下,一个注释为@Router可能包含一个标注为@Header,如下例所示,该值被映射到头部值,并在注释支持中有所说明:
@Router
public List<String> route(@Header("orderStatus") OrderStatus status)
| 关于基于XML的消息路由,包括XPath支持,请参见XML支持——处理XML有效载荷。 |
另请参阅Java DSL章节中的“消息路由器”,了解更多关于路由器配置的信息。
动态布线器
Spring Integration为常见的内容型路由场景提供了多种不同的路由器配置,并支持将自定义路由器作为POJO实现的选项。
例如有效载荷类型路由器提供了一种简单的方法来配置路由器,根据收到消息的有效载荷类型计算信道,且头值路由器在配置路由器时提供了同样的便利,该路由器通过计算特定消息头部的值来计算信道。
还有基于表达式(SpEL)的路由器,其通道是通过计算表达式确定的。
所有这些类型的布线器都表现出一些动态特性。
然而,这些路由器都需要静态配置。 即使是基于表达式的路由器,表达式本身也被定义为路由器配置的一部分,这意味着同一表达式在相同值下作时,总是计算出相同的通道。 这在大多数情况下是可以接受的,因为这些路线定义明确且可预测。 但有时我们需要动态更改路由器配置,以便消息流被路由到不同的频道。
例如,你可能想关闭系统的某部分进行维护,并暂时将消息重新路由到另一个消息流。
再举个例子,你可能想通过增加另一条路径来处理更具体的java.lang.Number(在有效载荷类型路由器).
不幸的是,静态路由器配置要实现这两个目标,你必须关闭整个应用,更改路由器配置(更改路由),然后再重新启用应用。 显然,这不是任何人想要的解决方案。
动态路由器模式描述了如何动态更改或配置路由器而不导致系统或单个路由器瘫痪的机制。
在我们具体介绍 Spring Integration 如何支持动态路由之前,我们需要先考虑路由器的典型流程:
-
计算通道标识符,这是路由器收到消息后计算的值。 通常,它是字符串或实际的实例
消息频道. -
将通道标识符解析为通道名称。 我们在本节后面将详细介绍这一过程。
-
将频道名称解析为实际
消息频道
如果第1步结果是实际的消息频道,因为消息频道是任何路由器工作的最终产物。
然而,如果第一步得到的通道标识符不是消息频道,你有相当多种可能的方式影响推导消息频道.
请考虑以下有效载荷型路由器的示例:
<int:payload-type-router input-channel="routingChannel">
<int:mapping type="java.lang.String" channel="channel1" />
<int:mapping type="java.lang.Integer" channel="channel2" />
</int:payload-type-router>
在有效载荷型路由器的背景下,上述三个步骤具体实现如下:
-
计算一个通道标识符,即有效载荷类型的完全限定名称(例如,
java.lang.字符串). -
将通道标识符解析为通道名称,利用前一步的结果从定义的有效载荷类型映射中选择合适的值,
映射元素。 -
将通道名称解析为
消息频道作为应用上下文中的豆子的引用(希望是消息频道)由前一步的结果识别。
换句话说,每一步都会传递到下一步,直到过程完成。
现在考虑一个头值路由器的例子:
<int:header-value-router input-channel="inputChannel" header-name="testHeader">
<int:mapping value="foo" channel="fooChannel" />
<int:mapping value="bar" channel="barChannel" />
</int:header-value-router>
现在我们可以考虑头值路由器的三个步骤:
-
计算一个通道标识符,即由
首部名称属性。 -
将通道标识符解析为通道名称,利用前一步的结果从定义的通用映射中选择合适的值
映射元素。 -
将通道名称解析为
消息频道作为应用上下文中的豆子的引用(希望是消息频道)由前一步的结果识别。
前两种不同路由器类型的配置看起来几乎一模一样。
然而,如果你看看其他配置的头值路由器我们清楚地看到,没有映射子元素,如下列表所示:
<int:header-value-router input-channel="inputChannel" header-name="testHeader">
不过,这个配置依然完全有效。 那么自然的问题是,第二步的映射怎么办?
第二步现在是可选的。
如果映射未定义,则第一步计算的通道标识符值自动被视为频道名称,现在被解析为实际的消息频道,也就是第三步。
这也意味着第二步是为路由器提供动态特性的关键步骤之一,因为它引入了一个过程,可以更改通道标识符对通道名称的解析方式,从而影响确定最终实例的过程消息频道根据初始频道标识符。
例如,在上述配置中,假设测试头值为“kermit”,现在是通道标识符(第一步)。
由于该路由器没有映射,将该信道标识符解析为信道名称(第二步)是不可能的,因此该信道标识符现在被视为信道名称。
但是,如果存在一个映射,但取值不同呢?
最终结果依然相同,因为如果通过将通道标识符解析为通道名称的过程无法确定新值,通道标识符就会成为通道名称。
剩下的就是第三步将通道名(“kermit”)解析为消息频道以此名称识别。
这基本上就是对所提供的名字进行“豆子”查询。
现在,所有包含头部-值对的消息为testHeader=kermit将被引导到一个消息频道其豆名(其身份证)是“kermit”。
但如果你想把这些消息转发到“辛普森”频道呢?显然,更改静态配置是可行的,但这也需要降低你的系统。
不过,如果你已经访问了信道标识符映射,可以引入一个新的映射,将头值对变成现在Kermit=Simpson。,因此第二步将“kermit”作为通道标识符,同时将其解析为“simpson”作为通道名称。
显然,这同样适用于有效载荷类型路由器,现在你可以重新映射或移除特定类型的有效载荷映射。
事实上,它适用于所有其他路由器,包括基于表达式的路由器,因为它们计算出的值现在有机会经过第二步,最终被解析为实际频道名称.
任何属于摘要地图消息路由器(包括大多数框架定义的布线器)是动态布线器,因为频道映射定义在摘要地图消息路由器水平。
该地图的设置器方法作为公开方法暴露,与“setChannelMapping”和“removeChannelMapping”方法一起。
只要你有路由器本身的参考,这些工具允许你在运行时更改、添加和移除路由器映射。
这也意味着你可以通过 JMX(参见 JMX 支持)或 Spring Integration 控制总线(参见控制总线)功能来暴露这些相同的配置选项。
回归信道密钥,因为信道名称灵活且方便。
然而,如果你不信任消息创建者,恶意行为者(了解系统)可能会创建消息,并将其路由到意想不到的渠道。
例如,如果密钥设置为路由器输入通道的通道名称,这样的消息会被路由回路由器,最终导致堆栈溢出错误。
因此,您可能希望关闭此功能(设置channelKeyFallback属性到false),并在需要时更改映射。 |
使用控制总线管理路由器映射
管理路由器映射的一种方法是控制总线模式,它会暴露一个控制通道,你可以通过它发送控制消息,管理和监控 Spring Integration 组件,包括路由器。
| 有关控制总线的更多信息,请参见控制总线。 |
通常,你会发送一个控制消息,要求对某个受管理组件(例如路由器)调用特定作。 以下管理作(方法)专门用于更改路由器解析过程:
-
public void setChannelMapping(String key, String channelName): 允许你添加新的或修改现有映射信道标识符和频道名称 -
public void removeChannelMapping(String key)允许您移除特定的通道映射,从而切断信道标识符和频道名称
请注意,这些方法可用于简单的更改(例如更新单一路由或添加或删除一条路由)。 但是,如果你想移除一条路由并添加另一条,更新不是原子式的。 这意味着路由表在更新之间可能处于不确定状态。 从4.0版本开始,你现在可以用控制总线原子更新整个路由表。 以下方法可以让你做到这一点:
-
public Map<String, String>getChannelMappings()返回当前映射。 -
公共空置替换通道映射(属性通道映射): 更新映射。 注意频道映射参数是性能对象。 这种布局使控制总线命令能够使用内置的StringToProperties转换器,如下示例所示:
"@'router.handler'.replaceChannelMappings('foo=qux \n baz=bar')"
注意每个映射之间都用一个换行字符分隔(\n).
对于对地图进行程序性更改,我们建议您使用setChannelMappings由于类型安全考虑,方法。replaceChannelMappings忽略不存在的键或值字符串对象。
通过使用 JMX 管理路由器映射
你也可以使用 Spring 的 JMX 支持来暴露一个路由器实例,然后用你喜欢的 JMX 客户端(例如 JConsole)来管理这些作(方法),以便更改路由器配置。
| 有关 Spring Integration 的 JMX 支持的更多信息,请参见 JMX 支持。 |
走线滑移
从4.1版本开始,Spring Integration提供了路由单点企业集成模式的实现。
它被实现为路由滑道消息头部,用于确定下一个信道摘要消息制作处理程序当输出通道端点未指定。
这种模式在复杂、动态的情况下非常有用,因为配置多个路由器以确定消息流会变得困难。
当消息到达一个端点时,没有输出通道这路由滑道被咨询以确定消息发送到的下一个通道。
当走线滑动耗尽时,正常回复频道处理简历。
路由单的配置以HeaderEnricher选项——一个分号分隔的路由滑脱,包含以下内容路径如下示例所示:
<util:properties id="properties">
<beans:prop key="myRoutePath1">channel1</beans:prop>
<beans:prop key="myRoutePath2">request.headers[myRoutingSlipChannel]</beans:prop>
</util:properties>
<context:property-placeholder properties-ref="properties"/>
<header-enricher input-channel="input" output-channel="process">
<routing-slip
value="${myRoutePath1}; @routingSlipRoutingPojo.get(request, reply);
routingSlipRoutingStrategy; ${myRoutePath2}; finishChannel"/>
</header-enricher>
上述例子如下:
-
一个
<上下文:property-placeholder>配置以证明路由单条中的条目路径可以指定为可解析键。 -
这
<header-enricher><路线滑脱>子元素用于填充路由滑头值消息处理器前往HeaderEnricher处理器。 -
这
路由滑头值消息处理器接受字符串解析路由滑移阵列路径条目与返回(来自)processMessage())singletonMap其中路径如钥匙和0作为首字母路由SlipIndex.
走线滑移路径条目可以包含消息频道Beans名字,路由支线策略豆名和春季表达式(SpEL)。
这路由滑头值消息处理器检查每个路由滑移路径针对豆子工厂在1号processMessage调用。
它将条目(在应用上下文中不是豆名)转换为表达式评估路由滑行路线策略实例。路由支线策略条目会被多次调用,直到返回空或空字符串.
由于路由滑移涉及getOutputChannel过程,我们有一个请求-回复上下文。
这路由支线策略被引入以确定下一个输出通道该 使用请求消息以及答对象。
该策略的实现应在应用上下文中注册为豆子,其豆子名称在路由单中使用路径.
这表达式评估路由滑行路线策略提供实现。
它接受 SpEL 表达式和内部表达式ExpressionEvaluatingRoutingSlipRouteStrategy.RequestAndReply对象被用作评估上下文的根对象。
这是为了避免评价背景每个ExpressionEvaluatingRoutingSlipRouteStrategy.getNextPath()调用。
它是一个简单的 Java 豆,具有两个属性:请求信息<?>和客体回复.
通过该表达式实现,我们可以指定路由滑移路径通过使用 SpEL 进行的条目(例如,@routingSlipRoutingPojo.get(请求,回复)和request.headers[myRoutingSlipChannel])并避免为路由支线策略.
这请求消息论元总是留言<?>.
根据上下文,回复对象可能是留言<?>一摘要集成信息构建器,或任意应用域对象(例如,当服务激活器调用的POJO方法返回时)。
在前两种情况下,通常消息性质(有效载荷和头)在使用 SpEL(或 Java 实现)时可以使用。
对于任意域对象,这些属性不可用。
因此,当你将路由单与POJO方法结合使用时,如果结果被用来确定下一条路径,请谨慎。 |
如果分布式环境中涉及路由滑移,我们建议不要使用内联表达式来表示路由滑移路径.
该建议适用于分布式环境,如跨JVM应用,使用以下条件请求-回复通过消息代理(如AMQP支持或JMS支持),或使用持久化程序消息商店 (消息存储)在整合流程中。
该框架使用路由滑头值消息处理器将其转换为表达式评估路由滑行路线策略对象,它们被用于路由滑道消息头部。
因为该类不是序列 化(不可能,因为它取决于豆子工厂),整个消息变得不可序列化,在任意分布式作中,我们得到NotSerializableException.
为克服此限制,注册为表达式评估路由滑行路线策略带有所需SpEL的豆子,并在路由单中使用其豆名路径配置。 |
对于 Java 配置,你可以添加一个路由滑头值消息处理器实例到HeaderEnricher豆子定义,如下示例所示:
@Bean
@Transformer(inputChannel = "routingSlipHeaderChannel")
public HeaderEnricher headerEnricher() {
return new HeaderEnricher(Collections.singletonMap(IntegrationMessageHeaderAccessor.ROUTING_SLIP,
new RoutingSlipHeaderValueMessageProcessor("myRoutePath1",
"@routingSlipRoutingPojo.get(request, reply)",
"routingSlipRoutingStrategy",
"request.headers[myRoutingSlipChannel]",
"finishChannel")));
}
当端点发出回复且否时,路由滑移算法的工作原理如下输出通道定义如下:
-
这
路由SlipIndex用于从路由票中获得一个数值路径列表。 -
如果 的值
路由SlipIndex是字符串,它被用来获得豆子豆子工厂. -
如果返回的豆子是 的实例
消息频道,它被用作下一个输出通道以及路由SlipIndex在回复消息头中被递增(路由单条路径条目保持不变)。 -
如果返回的豆子是 的实例
路由支线策略以及其获取下一路径不返回空字符串,该结果被用作下一个的豆名输出通道. 这路由SlipIndex保持不变。 -
如果
RoutingSlipRouteStrategy.getNextPath返回空字符串或零这路由SlipIndex是递增的,且getOutputChannelFromRoutingSlip在下一个路由滑移时递归调用路径项目。 -
如果下一次路由滑脱
路径条目不是字符串,它必须是 的实例路由支线策略. -
当
路由SlipIndex超过了走线滑差的大小路径列表,算法会切换到标准的默认行为回复频道页眉。
流程管理器企业集成模式
企业集成模式包括流程管理器模式。
你现在可以通过封装在路由支线策略在路由单上。
除了豆子名字外,路由支线策略可以返回任意消息频道且没有要求必须这样做消息频道实例在应用上下文中是豆子。
这样,当无法预测应使用哪个信道时,我们可以提供强大的动态路由逻辑。
一个消息频道可以在路由支线策略然后回来了。
一个固定订阅频道与消息处理器实施是这类情况的良好组合。
例如,你可以路由到响应式流,如下示例所示:
@Bean
public PollableChannel resultsChannel() {
return new QueueChannel();
}
@Bean
public RoutingSlipRouteStrategy routeStrategy() {
return (requestMessage, reply) -> requestMessage.getPayload() instanceof String
? new FixedSubscriberChannel(m ->
Mono.just((String) m.getPayload())
.map(String::toUpperCase)
.subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v)))
: new FixedSubscriberChannel(m ->
Mono.just((Integer) m.getPayload())
.map(v -> v * 2)
.subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v)));
}
Filter
消息过滤器用于判断消息应根据某些条件传递或放弃,比如消息头值或消息内容本身。
因此,消息过滤器类似于路由器,不同之处在于,对于从过滤输入通道接收到的每条消息,该消息可能被发送到过滤器的输出通道,也可能不会。
与路由器不同,它不会决定将消息发送到哪个信道,只决定是否发送消息。
| 正如我们本节后面所述,Filter还支持丢弃通道。 在某些情况下,它可以作为一个非常简单的路由器(或“交换机”),基于布尔条件。 |
在 Spring Integration 中,你可以将消息过滤器配置为消息端点,委派给消息选择器接口。
该界面本身相当简单,如下列表所示:
public interface MessageSelector {
boolean accept(Message<?> message);
}
这消息过滤构造函数接受选择器实例,如下示例所示:
MessageFilter filter = new MessageFilter(someSelector);
结合命名空间和 SpEL,你可以用很少的 Java 代码配置强大的过滤器。
使用 XML 配置过滤器
你可以使用<Filter>元素用于创建消息选择端点。
此外输入通道和输出通道属性,它需要一个裁判属性。
这裁判可以指向消息选择器实现方式,如下示例所示:
<int:filter input-channel="input" ref="selector" output-channel="output"/>
<bean id="selector" class="example.MessageSelectorImpl"/>
或者,你也可以添加方法属性。
在这种情况下,裁判属性可以指代任何对象。
引用的方法可能期望消息输入消息的类型或有效载荷类型。
该方法必须返回一个布尔值。
如果方法返回“true”,消息会发送到输出信道。
以下示例展示了如何配置使用以下条件的过滤器方法属性:
<int:filter input-channel="input" output-channel="output"
ref="exampleObject" method="someBooleanReturningMethod"/>
<bean id="exampleObject" class="example.SomeObject"/>
如果选择器或自适应的POJO方法返回false,有几个设置控制被拒绝消息的处理。
默认情况下(如前例配置)拒绝消息会被静默丢弃。
如果拒绝结果应导致错误,则设拒绝时抛出异常归属为true,如下示例所示:
<int:filter input-channel="input" ref="selector"
output-channel="output" throw-exception-on-rejection="true"/>
如果你希望被拒绝的消息被路由到特定信道,请提供该引用作为丢弃通道,如下示例所示:
<int:filter input-channel="input" ref="selector"
output-channel="output" discard-channel="rejectedMessages"/>
| 消息过滤器通常与发布-订阅频道配合使用。 许多过滤端点可能订阅同一个通道,它们决定是否将消息传递给下一个端点,该端点可以是任何支持的类型(如服务激活器)。 这为使用拥有单点对点输入通道和多个输出通道的消息路由器提供了一种被动替代方案。 |
我们建议使用裁判如果自定义Filter实现在其他地方被引用,属性<Filter>定义。
然而,如果自定义Filter的实现作用域仅为单一<Filter>元素,你应该提供一个内部的豆子定义,如下示例所示:
<int:filter method="someMethod" input-channel="inChannel" output-channel="outChannel">
<beans:bean class="org.foo.MyCustomFilter"/>
</filter>
同时使用裁判属性和内部处理程序定义在同一个<Filter>不允许配置,因为这会产生歧义条件并抛出异常。 |
如果裁判属性指代扩展的豆子消息过滤(例如框架本身提供的Filter),该配置通过直接将输出通道注入滤波豆进行优化。
在这种情况下,每一个裁判必须是独立的豆子实例(或原型-有瞄准镜的豆子)或使用内层<豆/>配置类型。
然而,这种优化仅适用于你在筛选 XML 定义中没有提供任何特定过滤器属性的情况。
如果你无意中引用了多个 Beans 中的同一个消息处理程序,就会出现配置异常。 |
随着 SpEL 支持的引入,Spring Integration 增加了表达属性映射到滤波元件。
它可以完全避免简单过滤器的 Java,如下示例所示:
<int:filter input-channel="input" expression="payload.equals('nonsense')"/>
作为表达属性值传递的字符串作为 SpEL 表达式被评估,消息可在评估上下文中获得。
如果你必须在应用上下文的范围内包含表达式的结果,你可以使用SpEL参考文档中定义的符号,如下示例所示:#{}
<int:filter input-channel="input"
expression="payload.matches(#{filterPatterns.nonsensePattern})"/>
如果表达式本身需要动态,可以使用“表达式”子元素。
这为通过密钥解析表达式提供了一层间接的ExpressionSource.
这是一个你可以直接实现的策略界面,或者你可以依赖 Spring Integration 中提供的版本,它会从“资源包”加载表达式,并在给定秒后检查修改情况。
所有这些都在下面的配置示例中得到展示,如果底层文件被修改,表达式可以在一分钟内重新加载:
<int:filter input-channel="input" output-channel="output">
<int:expression key="filterPatterns.example" source="myExpressions"/>
</int:filter>
<beans:bean id="myExpressions"
class="o.s.i.expression.ReloadableResourceBundleExpressionSource">
<beans:property name="basename" value="config/integration/expressions"/>
<beans:property name="cacheSeconds" value="60"/>
</beans:bean>
如果ExpressionSource豆子被命名表达来源,你无需在<表情>元素。 然而,在前面的例子中,我们为了完整性而展示它。
“config/integration/expressions.properties”文件(或任何带有区域扩展的更具体版本,需要以典型的资源包加载方式解析)可以包含键值对,如下示例所示:
filterPatterns.example=payload > 100
所有这些例子都使用表达作为属性或子元素,也可以应用于变换器、路由器、分流器、服务激活器和头部丰富元素中。给定组件类型的语义和角色会影响评估结果的解释,就像方法调用的返回值被解释一样。例如,一个表达式可以返回路由器组件应视为消息通道名称的字符串。然而,在Spring Integration中,将表达式作为根对象并解析带有“@”前缀的豆名,这些基本功能在Spring Integration中的所有核心EIP组件中是一致的。 |
带注释的过滤器配置
以下示例展示了如何通过注释配置过滤器:
public class PetFilter {
...
@Filter (1)
public boolean dogsOnly(String input) {
...
}
}
| 1 | 注释说明该方法将用作过滤器。如果该类被用作过滤器,必须明确说明。 |
XML 元素提供的所有配置选项也适用于@Filter注解。
过滤器可以明确引用 XML 或@MessageEndpoint注释定义在类上,通过类路径扫描自动检测。
另见《使用注释建议端点》。
分配器
分流器是一个组件,其职责是将消息划分为多个部分,并将生成的消息独立发送以供处理。它们通常是包含聚合器的管道中的上游生产者。
编程模型
用于执行拆分的 API 由一个基类组成,摘要消息分流器. 它是消息处理器实现了包含分流器常见特性的实现,例如填充相应的消息头部(CORRELATION_ID,SEQUENCE_SIZE和SEQUENCE_NUMBER)对所产生的消息进行。这种填充方式使得追踪消息及其处理结果成为可能(在典型情况下,这些头部会被复制到各个转换端点产生的消息中)。这些值随后可以被例如组合后的消息处理器使用。
以下示例展示了以下摘录摘要消息分流器:
public abstract class AbstractMessageSplitter
extends AbstractReplyProducingMessageConsumer {
...
protected abstract Object splitMessage(Message<?> message);
}
要在应用中实现特定的分配器,可以扩展摘要消息分流器并实现分裂消息方法,包含用于拆分消息的逻辑。返回值可以是以下之一:
-
一个
收集或消息数组,或可迭代(或迭 代)对消息进行迭代。在这种情况下,消息作为消息发送(在CORRELATION_ID,SEQUENCE_SIZE和SEQUENCE_NUMBER填充了)。采用这种方法可以让你有更多控制力——例如,在拆分过程中填充自定义消息头。 -
一个
收集或非消息对象数组,或可迭代(或迭 代)对非消息对象进行迭代。它的工作原理与前述类似,只是每个集合元素都作为消息负载使用。采用这种方法可以让你专注于领域对象,而无需考虑消息系统,并且生成的代码更易于测试。 -
一个
消息或非消息对象(但不是集合或数组)。它的工作原理与前述类似,只是发送一条消息。
在 Spring Integration 中,任何 POJO 都可以实现拆分算法,前提是它定义了一个接受单个参数且返回值的方法。在这种情况下,方法的返回值被解释为前述。输入参数可以是消息或者简单的 POJO。在后一种情况下,分路器接收到收到消息的有效载荷。我们推荐这种方法,因为它将代码与 Spring 集成 API 解耦,通常更容易测试。
迭代器
从4.1版本开始,摘要消息分流器支持迭 代类型值拆分。注意,在迭 代(或可迭代),我们无法访问底层项目的数量,且SEQUENCE_SIZE首部设置为0. 这意味着默认情况下序列大小释放策略一<聚合器>这行不通,而这个小组的CORRELATION_ID来自分配器不会被释放;它将保持为不完全的. 在这种情况下,你应该使用合适的自定义发布策略或者依赖发送部分结果于到期时䋰小组暂停或者MessageGroupStoreReaper.
从5.0版本开始,摘要消息分流器提供受保护的获得尺寸如果可能()能够确定可迭代和迭 代如果可能的话,物体。 例如XPathMessageSplitter可以确定标的资产的大小节点列表对象。 从5.0.9版本开始,该方法也会正确返回com.fasterxml.jackson.core.TreeNode.
一迭 代对象有助于避免在拆分前在内存中构建整个集合。例如,当底层项目从外部系统(例如数据库或FTP)填充时MGET)通过迭代或流。
用XML配置分路器
分线器可以通过XML配置如下:
<int:channel id="inputChannel"/>
<int:splitter id="splitter" (1)
ref="splitterBean" (2)
method="split" (3)
input-channel="inputChannel" (4)
output-channel="outputChannel" (5)
discard-channel="discardChannel" /> (6)
<int:channel id="outputChannel"/>
<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
| 1 | 分线器的ID是可选的。 |
| 2 | 在应用上下文中定义的豆子的引用。豆子必须实现前文所述的拆分逻辑。 自选。 如果没有提供对豆子的引用,则假设到达消息的有效载荷是输入通道是 的实现java.util.Collection默认的拆分逻辑应用于集合,将每个单独元素整合进消息并发送给输出通道. |
| 3 | 实现拆分逻辑的方法(定义在豆子上)。 自选。 |
| 4 | 分线器的输入通道。 必填。 |
| 5 | 分路器将分拆消息结果发送到的信道。可选(因为进消息可以指定回复信道)。 |
| 6 | 在分段结果为空时,请求消息发送到的信道。可选(如零结果)。 |
我们建议使用裁判如果自定义分路器实现可以被引用到其他<分线器>定义。 然而,如果自定义分流器处理器的实现应将作用域限定为<分线器>你可以配置一个内层豆定义,如下示例:
<int:splitter id="testSplitter" input-channel="inChannel" method="split"
output-channel="outChannel">
<beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
同时使用裁判属性和内部处理程序定义在同一个<智力:分裂器>不允许配置,因为这会造成歧义并导致异常抛出。 |
如果裁判属性指代扩展的豆子摘要消息制作处理程序(例如框架本身提供的分路器),配置通过直接将输出通道注入处理程序进行优化。在这种情况下,每个裁判必须是一个独立的豆子实例(或原型-有瞄准镜的豆子)或使用内层<豆/>配置类型。然而,该优化仅适用于在分配器XML定义中未提供任何分路器特定的属性。如果你无意中引用了多个BEANS中的同一个消息处理程序,就会出现配置异常。 |
聚合
基本上,聚合器是分路器的镜像,是一种消息处理程序,接收多个消息并将其合并为一条消息。 事实上,聚合器通常是包含分路器的管道中的下游消费者。
从技术上讲,聚合器比分路器更复杂,因为它是有状态的。
它必须保存待聚合的消息,并判断整组消息何时准备好聚合。
为此,需要消息商店.
功能性
聚合器通过关联和存储一组相关消息,直到该组被认为完整。 此时,聚合器通过处理整个组生成一条消息,并将聚合后的消息作为输出发送。
实现聚合器需要提供逻辑来执行聚合(即从多个消息中创建一个单一消息)。 两个相关的概念是相关性和释放。
相关性决定了消息如何分组以便聚合。
在春季积分中,相关性默认基于IntegrationMessageHeaderAccessor.CORRELATION_ID消息头部。
与同一条信息IntegrationMessageHeaderAccessor.CORRELATION_ID被归为一组。
不过,你可以自定义相关策略,允许其他方式指定消息的分组方式。
为此,你可以实现相关策略(本章后面将详细介绍)
为了确定一组消息何时准备好处理,需要发布策略被咨询。
聚合器的默认发布策略是在序列中包含的所有消息都存在时,基于IntegrationMessageHeaderAccessor.SEQUENCE_SIZE页眉。
你可以通过提供自定义的引用来覆盖这个默认策略发布策略实现。
编程模型
聚合API由若干类组成:
-
界面
消息组处理器,及其子类:MethodInvokingAggregatingMessageGroupProcessor和ExpressionEvaluatingMessageGroupProcessor -
这
发布策略界面及其默认实现:简单序列大小释放策略 -
这
相关策略界面及其默认实现:头部属性相关策略
聚合消息处理器
这聚合消息处理器(一个子类摘要相关消息处理)是消息处理器实现,封装聚合器的共同功能(及其他相关用例),具体如下:
-
将消息关联到待聚合的组中
-
在
消息商店直到该团体能够被释放 -
决定何时可以释放该小组
-
将释放的组合并成一条消息
-
识别并响应过期组
决定如何将消息分组的责任委托给了相关策略实例。
决定消息组是否可以释放的责任被委托给发布策略实例。
以下列表展示了基地的简要亮点摘要聚合消息组处理器(实施聚合有效载荷方法由开发者自行决定):
public abstract class AbstractAggregatingMessageGroupProcessor
implements MessageGroupProcessor {
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
// default implementation exists
}
protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);
}
看DefaultAggregatingMessageGroupProcessor,ExpressionEvaluatingMessageGroupProcessor和MethodInvokingMessageGroupProcessor作为开箱即用的摘要聚合消息组处理器.
从版本5.2开始,Function<MessageGroup, Map<String, Object>>策略适用于摘要聚合消息组处理器用于合并并计算输出消息的(聚合)头部。
这默认聚合首部函数实现方式支持返回所有组内无冲突的头部;在组内一个或多个消息中缺少头部不被视为冲突。
冲突的头部被省略。
以及新引入的授权MessageGroupProcessor,该函数用于任意(非-摘要聚合消息组处理器) 消息组处理器实现。
本质上,该框架将提供的函数注入到摘要聚合消息组处理器实例,并将所有其他实现封装为授权MessageGroupProcessor.
逻辑上的差异摘要聚合消息组处理器以及授权MessageGroupProcessor后者在调用代理策略前不会提前计算头部,且如果代理返回 a消息或摘要集成信息构建器.
在这种情况下,框架假设目标实现已经生成了一组合适的头部,并填充到返回的结果中。
这Function<MessageGroup, Map<String, Object>>策略可作为标题函数XML 配置的参考属性,作为AggregatorSpec.headersFunction()Java DSL 的选项,以及AggregatorFactoryBean.setHeadersFunction()用于纯 Java 配置。
这相关策略属于摘要相关消息处理且 的默认值基于IntegrationMessageHeaderAccessor.CORRELATION_ID消息头,如下示例所示:
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
...
this.correlationStrategy = correlationStrategy == null ?
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
...
}
至于消息组的实际处理,默认实现是DefaultAggregatingMessageGroupProcessor.
它会创造一个单一消息其有效载荷为列表该组接收的有效载荷数量。
这对于简单的分散收集实现(如分路器、发布-订阅通道或上游接收者列表路由器)效果良好。
在使用发布-订阅频道或接收者列表路由器时,务必启用应用序列旗。
这样做会添加必要的头部:CORRELATION_ID,SEQUENCE_NUMBER和SEQUENCE_SIZE.
在 Spring Integration 中,分线器默认启用了该行为,但发布-订阅信道或接收者列表路由器则未启用,因为这些组件可能在多种情况下使用,这些头部并不必要。 |
在为应用实施特定的聚合策略时,你可以扩展摘要聚合消息组处理器并实现聚合有效载荷方法。
然而,有更好的解决方案,更少依赖于API,用于实现聚合逻辑,这些逻辑可以通过XML或注释进行配置。
一般来说,任何POJO如果提供一种接受单一java.util.List作为一个论元(参数化列表也被支持)。
该方法用于聚合消息,具体如下:
-
如果该参数是
java.util.Collection<T>参数类型T可分配给消息,所有为聚合而累积的消息列表都会发送给聚合器。 -
如果参数是非参数化的
java.util.Collection或者参数类型不可分配于消息,该方法接收累积消息的有效载荷。 -
如果返回类型不可分配于
消息,它被视为 的有效载荷消息这由框架自动创建。
| 为了简化代码并推广低耦合、可检验性等最佳实践,首选实现聚合逻辑的方式是通过 POJO 并在应用中使用 XML 或注释支持进行配置。 |
从5.3版本开始,处理消息组后,摘要相关消息处理执行MessageBuilder.popSequenceDetails()针对具有多个嵌套层级的适当分路器-聚合器场景进行消息头修改。
只有当消息组释放结果不是消息集合时才会这样做。
那就是目标消息组处理器负责MessageBuilder.popSequenceDetails()在建立这些信息的同时打电话。
如果消息组处理器返回 a消息一个MessageBuilder.popSequenceDetails()只有当序列细节匹配了群组中第一个消息。
(此前仅在普通有效载荷或摘要集成信息构建器已从中归还消息组处理器.)
该功能可以通过新的popSequence 布尔性质,因此MessageBuilder.popSequenceDetails()当标准分配器未填充相关细节时,可关闭。
这一特性本质上抵消了最近上游的做法应用序列 = 真在摘要消息分流器.
更多信息请参见Splitter。
这SimpleMessageGroup.getMessages()方法返回不可修改的集合.
因此,如果聚合POJO方法具有收藏<信息>参数,传递的参数正是收集实例和,当你使用一个简易消息商店对于聚合器来说,原始的收藏<信息>释放团队后被清除。
因此,收藏<信息>如果 POJO 中的变量是从聚合器传递出来的,也被清除。
如果你想直接发布该收藏以便进一步处理,必须重新构建一个新的收集(例如,新 ArrayList<Message>(messages)).
从4.3版本开始,框架不再将消息复制到新集合,以避免不必要的额外对象创建。 |
在4.2版本之前,无法提供消息组处理器通过使用 XML 配置。
只有POJO方法可用于聚合。
现在,如果框架检测到被引用的(或内部)豆实现了消息处理器它被用作聚合器的输出处理器。
如果你想从自定义中发布一组对象消息组处理器作为消息的有效载荷,你的类应扩展摘要聚合消息组处理器并实现聚合有效载荷().
此外,自4.2版本起,aSimpleMessageGroup处理器提供。
它返回来自该组的消息集合,如前所述,这会导致释放的消息被单独发送。
这使得聚合器能够作为消息屏障,将到达的消息保留,直到发布策略触发,并将该组作为一系列单独消息释放。
从6.0版本开始,上述拆分行为仅在组处理器为SimpleMessageGroup处理器.
否则,和其他任何角色消息组处理器返回收藏<信息>,只发出一条回复消息,所有消息集合作为有效载荷。
这种逻辑由聚合器的典型目的所决定——通过某个密钥收集请求消息并生成单一分组消息。
发布策略
这发布策略接口定义如下:
public interface ReleaseStrategy {
boolean canRelease(MessageGroup group);
}
一般来说,任何 POJO 只要提供一个接受单一java.util.List作为参数(也支持参数化列表),并返回一个布尔值。
该方法在每条新消息到达后调用,以判断该组是否完整,具体如下:
-
如果该参数是
java.util.List<T>以及参数类型T可分配为消息,组中累积的全部消息列表被发送到该方法。 -
如果参数是非参数化的
java.util.List或者参数类型不可分配于消息,该方法接收累积消息的有效载荷。 -
方法必须返回
true消息组是否准备好进行聚合,否则为false。
以下示例展示了如何使用@ReleaseStrategy注释列表类型消息:
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<Message<?>>) {...}
}
以下示例展示了如何使用@ReleaseStrategy注释列表类型字符串:
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<String>) {...}
}
基于前两个示例中的签名,基于POJO的释放策略通过收集尚未发布的消息(如果你需要访问全部内容)消息)或收集对于有效载荷对象(如果类型参数不是消息).
这满足了大多数使用场景。
不过,如果你因为某些原因需要访问完整版消息组你应该提供发布策略接口。
|
在处理可能规模较大的组时,你应了解这些方法是如何被调用的,因为释放策略可以在组被释放前多次调用。
最高效的是实现 基于这些原因,对于大型团队,我们建议实施 |
当该组被释放用于聚合时,所有尚未发布的消息都会被处理并从组中移除。
如果该群也是完备的(即所有来自序列的消息都已到达或没有定义序列),则该群被标记为完备。
该组的任何新消息都会发送到丢弃信道(如有定义)。
设置完成时过期组自true(默认为false) 移除整个组,任何与被移除组关联相同的新消息组成一个新组。
你可以通过使用MessageGroupStoreReaper䋰发送部分结果于到期时被设置为true.
为了方便丢弃迟到消息,聚合器必须在消息发布后保持关于该组的状态。
这最终可能导致内存不足的情况。
为了避免这种情况,你应该考虑配置一个MessageGroupStoreReaper以移除组元数据。
应设置到期参数,当到达某一点后,延迟消息预计不会到达时,组才会过期。
有关配置收割者的信息,请参见聚合器中的状态管理:MessageGroupStore. |
Spring Integration 提供了发布策略:简单序列大小释放策略.
本实现参考了SEQUENCE_NUMBER和SEQUENCE_SIZE每个到达消息的头部,用来决定消息组何时完成并准备聚合。
如前所述,这也是默认策略。
在5.0版本之前,默认发布策略是序列大小释放策略,但该方法在大群体中表现不佳。
通过这种策略,重复序列号被检测并拒绝。
这项手术可能花费不菲。 |
如果你在聚合大群,不需要释放部分群,也不需要检测/拒绝重复序列,可以考虑使用简单序列大小释放策略相反,在这些用例中,它效率更高,并且自5.0版本起默认使用,当部分组发布未被指定时。
聚合大型群体
4.3版本更改了默认设置收集对于 a 中的消息简易消息组自哈希集(之前是阻塞队列).
当从大组中移除单个消息时,这种方式成本较高(需要O(n)的线性扫描)。
虽然哈希集通常移除速度更快,但对于大型消息来说可能成本较高,因为哈希值必须在插入和移除时都计算。
如果你有需要哈希的消息,可以考虑使用其他类型的集合。
如同相关内容用消息组工厂一个简易消息组工厂提供给你选择收集这最适合你的需求。
你也可以提供自己的工厂实现来创建其他收藏<消息<?>>.
以下示例展示了如何配置聚合器,使用之前的实现和简单序列大小释放策略:
<int:aggregator input-channel="aggregate"
output-channel="out" message-store="store" release-strategy="releaser" />
<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
<property name="messageGroupFactory">
<bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
<constructor-arg value="BLOCKING_QUEUE"/>
</bean>
</property>
</bean>
<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果Filter端点涉及聚合器上游的流,序列大小释放策略(固定或基于序列大小头部)无法发挥其作用,因为序列中的某些消息可能会被过滤器丢弃。
在这种情况下,建议选择其他方案发布策略,或者使用从丢弃子流发送的补偿消息,其中包含内容中某些信息,在自定义完整群函数中可跳过。
更多信息请参见筛选。 |
相关性策略
这相关策略接口定义如下:
public interface CorrelationStrategy {
Object getCorrelationKey(Message<?> message);
}
该方法返回对象代表用于将消息与消息组关联的关联键。
密钥必须满足用于密钥的条件地图关于等值()和hashCode().
一般来说,任何POJO都可以实现相关逻辑,且将消息映射到方法参数(或多个参数)的规则与服务激活器(包括支持@Header注释)。
方法必须返回一个值,且该值不得为零.
Spring Integration 提供了相关策略:头部属性相关策略.
该实现返回其中一个消息头部(其名称由构造函数参数指定)作为关联键。
默认情况下,相关策略为头部属性相关策略返回 的值CORRELATION_ID标题属性。
如果你有自定义的头部名称想用于关联,可以在头部属性相关策略并以此作为聚合器相关策略的参考。
锁注册处
对组的更改是线程安全的。
所以,当你同时发送同一相关ID的消息时,聚合器中只会处理其中一个消息,实际上是单线程的每条消息组。
一个锁注册用于获得已解析相关ID的锁。
一个默认锁注册表默认使用(内存内)。
用于在服务器间同步更新,共享MessageGroupStore如果正在使用,你必须配置共享锁注册表。
避免僵局
如上所述,当消息组发生变异(消息添加或释放)时,会被保留锁。
考虑以程:
...->aggregator1-> ... ->aggregator2-> ...
如果有多个线程,且聚合器共享一个共同锁注册表,就有可能出现死锁。
这会导致挂线和jstack <PID>可能得到如下结果:
Found one Java-level deadlock:
=============================
"t2":
waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t1"
"t1":
waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t2"
有几种方法可以避免这个问题:
-
确保每个聚合器拥有自己的锁注册表(锁可以是跨应用实例共享的注册表,但流程中的两个或多个聚合器必须各自拥有独立的注册表)
-
使用一个
执行者频道或队列通道作为聚合器的输出通道,使下游流运行在新的线程上 -
从5.1.1版本开始,设置
releaseLockBeforeSend聚合器属性 到true
| 如果某个聚合器的输出最终被路由回同一个聚合器,也可能导致这个问题。 当然,上述第一个解决方案在这种情况下不适用。 |
在Java DSL中配置聚合器
请参阅聚合器和重序列器,了解如何在 Java DSL 中配置聚合器。
使用 XML 配置聚合器
Spring 集成支持通过<聚合器/>元素。
以下示例展示了聚合器的示例:
<channel id="inputChannel"/>
<int:aggregator id="myAggregator" (1)
auto-startup="true" (2)
input-channel="inputChannel" (3)
output-channel="outputChannel" (4)
discard-channel="throwAwayChannel" (5)
message-store="persistentMessageStore" (6)
order="1" (7)
send-partial-result-on-expiry="false" (8)
send-timeout="1000" (9)
correlation-strategy="correlationStrategyBean" (10)
correlation-strategy-method="correlate" (11)
correlation-strategy-expression="headers['foo']" (12)
ref="aggregatorBean" (13)
method="aggregate" (14)
release-strategy="releaseStrategyBean" (15)
release-strategy-method="release" (16)
release-strategy-expression="size() == 5" (17)
expire-groups-upon-completion="false" (18)
empty-group-min-timeout="60000" (19)
lock-registry="lockRegistry" (20)
group-timeout="60000" (21)
group-timeout-expression="size() ge 2 ? 100 : -1" (22)
expire-groups-upon-timeout="true" (23)
scheduler="taskScheduler" > (24)
<expire-transactional/> (25)
<expire-advice-chain/> (26)
</aggregator>
<int:channel id="outputChannel"/>
<int:channel id="throwAwayChannel"/>
<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
<constructor-arg ref="dataSource"/>
</bean>
<bean id="aggregatorBean" class="sample.PojoAggregator"/>
<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>
<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
| 1 | 聚合器的ID是可选的。 |
| 2 | 生命周期属性指示聚合器是否应在应用上下文启动时启动。 可选(默认为“true”)。 |
| 3 | 聚合器接收消息的通道。 必填。 |
| 4 | 聚合器发送聚合结果的通道。 可选(因为收到的消息可以在“replyChannel”消息头中指定回复通道)。 |
| 5 | 聚合器发送超时消息的信道(如果发送部分结果于到期时是false).
自选。 |
| 6 | 对MessageGroupStore用于在其关联键下存储消息组,直到它们完成。
自选。
默认情况下,它是一个挥发性内存存储。
更多信息请参见消息存储。 |
| 7 | 当多个句柄订阅同一句柄时,该聚合器的顺序直达频道(用于负载均衡目的)。
自选。 |
| 8 | 表示过期消息应被汇总并发送到包含的“输出通道”或“回复通道”消息组已过期(参见MessageGroupStore.expireMessageGroups(long)).
一种过期方式消息组是通过配置MessageGroupStoreReaper.
不过,你也可以选择过期消息组通过呼叫MessageGroupStore.expireMessageGroups(timeout).
你可以通过控制总线作实现这一点,或者如果你有对MessageGroupStore实例,通过调用expireMessageGroups(timeout).
否则,单独这个属性就没有任何作用。
它仅作为指示,判断是否丢弃或发送仍处于消息组那条快到期了。
可选(默认为false).
注意:这个属性可能更准确地说是超时时发送部分结果,因为该群实际上可能不会失效,如果超时过期的分组设置为false. |
| 9 | 发送回复时等待的超时间隔消息前往输出通道或丢弃通道.
默认-1,导致阻塞无限期。
只有当输出通道存在某些“发送”限制时,才会应用,例如:队列通道但有固定的“容量”。
在这种情况下,一个MessageDeliveryException被抛出。
为摘要订阅频道实现,发送超时被忽略。
为group-timeout(-表达式)这MessageDeliveryException从计划中的过期任务开始,该任务被重新安排。
自选。 |
| 10 | 引用一个实现消息相关(分组)算法的豆子。
豆子可以是相关策略界面或POJO。
在后一种情况下,相关策略方法属性也必须被定义。
可选(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.CORRELATION_ID页首)。 |
| 11 | 定义在 由相关策略.
它实现了相关决策算法。
可选,但有限制(相关策略必须存在)。 |
| 12 | 一个表示相关策略的SpEL表达式。
例:“headers['something']”.
只有其中一个相关策略或相关-策略-表达是允许的。 |
| 13 | 在应用上下文中定义的豆子的引用。 豆子必须实现如前所述的聚合逻辑。 可选(默认情况下,聚合消息列表成为输出消息的有效载荷)。 |
| 14 | 定义在由裁判属性。
它实现了消息聚合算法。
可选(这取决于情况裁判属性正在定义中)。 |
| 15 | 指的是实现释放策略的豆子。
豆子可以是发布策略界面或POJO。
在后一种情况下,发布策略方法属性也必须被定义。
可选(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.SEQUENCE_SIZE标题属性)。 |
| 16 | 定义在由释放策略属性。
它实现了完备判定算法。
可选,但有限制(释放策略必须存在)。 |
| 17 | 一个代表发布策略的 SpEL 表达式。
表达式的根对象是消息组.
例:“尺寸()== 5”.
只有其中一个释放策略或释放-策略-表达是允许的。 |
| 18 | 当设置为true(默认为false完成的组会从消息存储中移除,后续具有相同关联的消息则形成新的组。
默认行为是发送与完成组相对于丢弃通道. |
| 19 | 仅当MessageGroupStoreReaper配置为消息商店关于<聚合器>.
默认情况下,当MessageGroupStoreReaper配置为部分组的过期,空组也会被移除。
空组通常在释放组后出现。
空组用于检测和丢弃迟到消息。
如果你希望空组的过期时间比部分组过期更长,可以设置此属性。
空组则不会从中移除消息商店直到至少在这毫秒内未被修改。
注意,空组的实际到期时间也会受到收割者超时属性,可以是这个值加上超时。 |
| 20 | 对org.springframework.integration.util.LockRegistry豆。
它曾经获得锁基于组ID对于并行作消息组.
默认情况下,内部默认锁注册表被使用。
分布式的使用锁注册,例如Zookeeper 锁注册表确保聚合器只有一个实例可以同时在一个组上运行。
更多信息请参见Redis Lock Registry或Zookeeper Lock Registry。 |
| 21 | 超时(以毫秒计)来强制消息组当发布策略当前消息到达时不会释放该组。
该属性为聚合器提供了内置的时间基础释放策略,当需要发出部分结果(或丢弃该组)时,如果没有新的消息到达消息组在超时时间内,从最后一条消息到达开始算。
设置一个超时,从时间开始计算消息组被创建,见组-超时-表达式信息。
当新消息到达聚合器时,任何已有的消息ScheduledFuture<?>对于其消息组被取消了。
如果发布策略返回false(意为不放手)和小组暂停> 0,一个新任务被安排以终止该组。
我们不建议将该属性设为零(或为负值)。
这样做实际上会禁用聚合器,因为每个消息组都会立即完成。
不过,你可以通过使用表达式来有条件地将其设置为零(或负值)。
看组-超时-表达式获取信息。
完成过程中采取的动作取决于发布策略以及send-partial-group-on-expiry属性。
更多信息请参见聚合器和组超时。
它与“组-超时-表达式”属性互斥。 |
| 22 | SpEL表达式,使得小组暂停其中消息组作为#root评估上下文对象。
用于调度消息组被迫完成。
如果该表达式对应为零,完成过程未被安排。
如果计算为零,则该组立即在当前线程上完成。
实际上,这形成了一种动态小组暂停财产。
举个例子,如果你想强制完成一个消息组自组建以来10秒后,你可以考虑使用以下SpEL表达式:时间戳 + 10000 - T(System).currentTimeMillis()哪里时间戳由MessageGroup.getTimestamp()作为消息组这里是#root评估上下文对象。
但请注意,组的创建时间可能与第一个到达消息的时间不同,这取决于其他组到期属性的配置。
看小组暂停更多信息请见。
与“群体暂停”属性互斥。 |
| 23 | 当一个小组因超时(或MessageGroupStoreReaper),该组默认已过期(完全移除)。
迟到的消息会启动新群组。
你可以把它设置为false以完成该组,但其元数据保持不变,以便丢弃迟到的消息。
空组可以随后通过MessageGroupStoreReaper与空组最小超时属性。
默认为“true”。 |
| 24 | 一个任务调度器BEAN 引用以安排消息组如果没有新的消息到达,则强制完成消息组在小组暂停.
如果没有提供,默认调度器(任务调度器)注册于应用上下文 (ThreadPoolTaskScheduler)被使用。
该属性不适用于小组暂停或组-超时-表达式未具体说明。 |
| 25 | 自4.1版本起。
它允许对forceComplete(力道完成)操作。
它从一个group-timeout(-表达式)或者由MessageGroupStoreReaper且不应用于正规加,释放和丢弃操作。
只有该子元素或<过期-建议链/>是允许的。 |
| 26 | 自4.1版本起。
它允许配置任意建议对于forceComplete(力道完成)操作。
它从一个group-timeout(-表达式)或者由MessageGroupStoreReaper且不应用于正规加,释放和丢弃操作。
只有该子元素或<过期事务/交易>是允许的。
一笔交易建议也可以在这里使用Spring进行配置德州Namespace。 |
|
过期组别
关于过期(完全移除)组有两个属性。
当一个组过期时,不会有记录;如果新消息以相同相关性到达,则会启动一个新的组。
当一个组完成(无过期)时,空组依然存在,迟到的消息被丢弃。
空组可以通过后续使用
如果一组未正常完成,但因超时被释放或丢弃,通常该组会失效。
从4.1版本开始,你可以通过以下方式来控制这种行为
自5.0版本起,空组也会在之后被安排移除 从5.4版本开始,聚合器(和重排序器)可以配置为终止孤儿组(即存在于持久消息存储中可能不会发布的组)。
这 |
我们通常建议使用裁判属性,如果可以引用自定义聚合器处理程序的实现,则可在其他中引用<聚合器>定义。
然而,如果一个自定义聚合器实现仅由单一定义使用<聚合器>你可以使用内部豆定义(从版本1.0.3开始)在<聚合器>元素,如下例所示:
<aggregator input-channel="input" method="sum" output-channel="output">
<beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
同时使用裁判属性和内部豆子定义在同一个<聚合器>不允许配置,因为这会产生歧义。
在这种情况下,会抛出异常。 |
以下示例展示了聚合豆的实现:
public class PojoAggregator {
public Long add(List<Long> results) {
long total = 0l;
for (long partialResult: results) {
total += partialResult;
}
return total;
}
}
前述示例中完成策略豆的实现可能如下:
public class PojoReleaseStrategy {
...
public boolean canRelease(List<Long> numbers) {
int sum = 0;
for (long number: numbers) {
sum += number;
}
return sum >= maxValue;
}
}
| 在合适的地方,发布策略法和聚合法可以合并成一个整体。 |
上述示例中相关策略豆的实现可能如下:
public class PojoCorrelationStrategy {
...
public Long groupNumbersByLastDigit(Long number) {
return number % 10;
}
}
前述示例中的聚合器会按某种标准(此处为除以十后的剩余数)分组,并保持该组直到有效载荷提供的数字之和超过某个值。
| 在合适的地方,释放策略法、相关策略法和聚合法可以合并在一个豆子里。 (实际上,它们全部或任意两个都可以组合。) |
聚合器与 Spring 表达式语言(SpEL)
自 Spring Integration 2.0 起,你可以用 SpEL 处理各种策略(关联、发布和聚合),如果发布策略背后的逻辑相对简单,我们推荐采用这种方法。
假设你有一个遗留组件,设计用来接收一组对象。
我们知道默认发布策略将所有聚合消息汇聚于列表.
现在我们有两个问题。
首先,我们需要从列表中提取单独的消息。
其次,我们需要提取每个消息的有效载荷并组装对象数组。
以下示例解决了这两个问题:
public String[] processRelease(List<Message<String>> messages){
List<String> stringList = new ArrayList<String>();
for (Message<String> message : messages) {
stringList.add(message.getPayload());
}
return stringList.toArray(new String[]{});
}
然而,使用 SpEL,这种需求其实可以用一行表达式相对容易地处理,从而避免了写自定义类并将其配置为 bean 的过程。 以下示例展示了如何实现:
<int:aggregator input-channel="aggChannel"
output-channel="replyChannel"
expression="#this.![payload].toArray()"/>
在之前的配置中,我们使用集合投影表达式从列表中所有消息的有效载荷中组装出一个新的集合,然后将其转换为数组,从而实现了与之前 Java 代码相同的结果。
在处理自定义释放和相关策略时,也可以采用同样的基于表达式的方法。
而不是为习俗定义豆子相关策略在相关策略属性,你可以将简单的相关逻辑实现为 SpEL 表达式,并在相关-策略-表达属性,如下例所示:
correlation-strategy-expression="payload.person.id"
在前述例子中,我们假设有效载荷具有人属性身份证,将用于关联消息。
同理,对于发布策略你可以将发布逻辑实现为 SpEL 表达式,并在释放-策略-表达属性。
评估上下文的根对象是消息组本身。
这列表可以通过以下方式引用 。消息表达式中群的属性。
在5.0版本之前的版本中,根对象是留言<?>,如前例所示: |
release-strategy-expression="!messages.?[payload==5].empty"
在上述示例中,SpEL评估上下文的根对象是消息组你说,一旦出现带有有效载荷的消息5在这个群体中,应该释放这个群体。
聚合器与组超时
从4.0版本开始,引入了两个互斥属性:小组暂停和组-超时-表达式.
参见“配置 XML 聚合器”。
在某些情况下,如果发布策略当前消息到达时不会释放。
为此,小组暂停选项:调度消息组被迫完成,如下示例所示:
<aggregator input-channel="input" output-channel="output"
send-partial-result-on-expiry="true"
group-timeout-expression="size() ge 2 ? 10000 : -1"
release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>
在此示例中,如果聚合器收到了按顺序定义的最后一条消息,则正常释放是可能的释放-策略-表达.
如果该消息没有到达,小组暂停只要该组至少包含两个消息,则强制该组在十秒后完成。
强制该群完成的结果取决于发布策略以及发送部分结果于到期时.
首先,再次咨询释放策略,以确定是否需要进行正常释放。
虽然该组织未曾改变,发布策略目前可以决定释放该组合。
如果释放策略仍然无法释放该组,该策略即告失效。
如果发送部分结果于到期时是true,(部分)中已有的消息消息组作为正常聚合器回复消息发布给输出通道.
否则,该物品将被弃用。
两者之间是有区别的小组暂停行为和MessageGroupStoreReaper(参见使用 XML 配置聚合器)
收割者对所有玩家发起强制完成消息组s 在MessageGroupStore周期性地。
这小组暂停每个人都做消息组如果在小组暂停.
此外,收割器还可用于移除空组(空组用于丢弃延迟消息完成时过期组是错误的)。
从5.5版本开始,组超时表达式可以被评估为java.util.Date实例。
这在如根据组创建时间确定预定任务时刻等情况下非常有用(MessageGroup.getTimestamp())而非计算时的当前消息到达组超时表达式被评估为长:
group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"
配置带有注释的聚合器
以下示例展示了一个配置有注释的聚合器:
public class Waiter {
...
@Aggregator (1)
public Delivery aggregatingMethod(List<OrderItem> items) {
...
}
@ReleaseStrategy (2)
public boolean releaseChecker(List<Message<?>> messages) {
...
}
@CorrelationStrategy (3)
public String correlateBy(OrderItem item) {
...
}
}
| 1 | 注释说明该方法应用作聚合器。 如果该类被用作聚合器,必须明确说明。 |
| 2 | 注释说明该方法被用作聚合器的发布策略。
如果在任何方法中都不存在,聚合器则使用简单序列大小释放策略. |
| 3 | 注释说明该方法应作为聚合器的相关策略使用。
如果没有相关性策略,聚合器会使用头部属性相关策略基于CORRELATION_ID. |
XML元素提供的所有配置选项也适用于@Aggregator注解。
聚合器可以明确引用 XML或如果@MessageEndpoint定义在类上,通过类路径扫描自动检测。
注释配置(@Aggregator以及其他)聚合器组件只涵盖简单场景,大多数默认选项已足够。
如果你在使用注释配置时需要对这些选项有更多控制,可以考虑使用@Bean定义聚合消息处理器并标记其@Bean方法:@ServiceActivator,如下示例所示:
@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageGroupStore);
aggregator.setOutputChannel(resultsChannel());
aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
aggregator.setTaskScheduler(this.taskScheduler);
return aggregator;
}
从4.2版本开始,聚合器FactoryBean可用来简化 Java 配置聚合消息处理器. |
聚合器中的状态管理:MessageGroupStore
聚合器(以及 Spring Integration 中的某些其他模式)是一种有状态模式,要求基于一组在一段时间内到达的消息做出决策,这些消息都具有相同的相关键。
有状态模式中接口的设计(例如发布策略)的驱动原则是组件(无论是由框架定义还是由用户定义)都应能够保持无状态状态。
所有状态都由消息组其管理权委托给MessageGroupStore.
这MessageGroupStore接口定义如下:
public interface MessageGroupStore {
int getMessageCountForAllMessageGroups();
int getMarkedMessageCountForAllMessageGroups();
int getMessageGroupCount();
MessageGroup getMessageGroup(Object groupId);
MessageGroup addMessageToGroup(Object groupId, Message<?> message);
MessageGroup markMessageGroup(MessageGroup group);
MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);
MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);
void removeMessageGroup(Object groupId);
void registerMessageGroupExpiryCallback(MessageGroupCallback callback);
int expireMessageGroups(long timeout);
}
更多信息请参见Javadoc。
这MessageGroupStore积累状态信息消息组在等待释放策略触发期间,而那个事件可能永远都不会发生。
因此,为了防止陈旧消息持续存在,并且让易失的存储在应用关闭时提供清理的钩子,MessageGroupStore允许你注册回调以应用到消息组当他们过期时。
界面非常直观,如下列表所示:
public interface MessageGroupCallback {
void execute(MessageGroupStore messageGroupStore, MessageGroup group);
}
回调可以直接访问存储和消息组,以便管理持久状态(例如,完全将组从存储中移除)。
这MessageGroupStore维护这些回调列表,按需应用于所有时间戳早于参数时间的消息(参见registerMessageGroupExpiryCallback(..)和expireMessageGroups(..)方法,前面描述)。
重要的是不要使用相同的信息MessageGroupStore在不同的聚合器组件中,当你打算依赖expireMessageGroups功能性。
每摘要相关消息处理注册自己的消息组回拨基于forceComplete()回调。
这样,每个待到期的组都可以被错误的聚合器完成或丢弃。
从版本5.0.10开始,UniqueExpiryCallback是从摘要相关消息处理对于注册回拨,在MessageGroupStore.
这MessageGroupStore而,则检查该类实例的存在,并在回调集中已有错误时,记录错误并发送相应消息。
这样,框架就不允许使用MessageGroupStore在不同的聚合器/重排序器中实例化,以避免上述的过期副作用,导致特定相关处理程序未创建的组。 |
你可以打电话给expireMessageGroups方法,带有超时值。
任何比当前时间减去该值更早的消息均已过期,回调已生效。
因此,存储用户定义了消息组“过期”的含义。
为了方便用户,Spring Integration 提供了一个封装消息的封装器,形式为MessageGroupStoreReaper,如下示例所示:
<bean id="reaper" class="org...MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>
死神是可跑.
在前述示例中,消息组存储的过期方法每十秒调用一次。
超时时间是30秒。
需要理解的是,“超时”属性MessageGroupStoreReaper是一个近似值,并且受任务调度器速率的影响,因为该属性仅在下一次计划执行时才被检查,MessageGroupStoreReaper任务。
例如,如果超时时间设置为十分钟,但MessageGroupStoreReaper任务每小时运行一次,最后一次执行MessageGroupStoreReaper任务发生在暂停前一分钟,消息组接下来的59分钟内不会失效。
因此,我们建议将超时率设定为至少等于或更短的超时值。 |
除了收割器外,当应用程序通过生命周期回调关闭时,也会调用到期回调摘要相关消息处理.
这摘要相关消息处理注册自身的到期回调,这就是带有布尔标志的链接发送部分结果于到期时在聚合器的XML配置中。
如果标志设置为true然后,当调用到期回调时,尚未释放的未标记消息可以发送到输出通道。
自从......MessageGroupStoreReaper从调度任务中调用,可能会生成一条消息(取决于sendPartialResultOnExpiry选项)对下游集成流程的建议提供自定义任务调度器其中消息发布错误处理通过errorChannel,这可能是常规聚合器发布功能所预期的。
同样的逻辑也适用于组超时功能,该功能同样依赖于任务调度器.
更多信息请参见错误处理。 |
|
当共享时 一些 欲了解更多关于 |
通量聚合器
在5.2版本中,FluxAggregatorMessageHandler组件已被引入。
它基于反应堆项目Flux.groupBy()和Flux.window()运营商。
收到的消息会被发送到通量汇由Flux.create()在该组件的构造器中。
如果输出通道不提供或不是 的实例ReactiveStreamsSubscribeableChannel(可订阅频道),主订阅通量由Lifecycle.start()实现。
否则,将推迟到由ReactiveStreamsSubscribeableChannel(可订阅频道)实现。
消息按Flux.groupBy()使用相关策略对于组键。
默认情况下,IntegrationMessageHeaderAccessor.CORRELATION_ID查询消息的头部。
默认情况下,每个关闭窗口都以通量在消息的有效载荷中产生。
该消息包含窗口中第一个消息的所有头部。
这通量在输出消息中,有效载荷必须订阅并下游处理。
这样的逻辑可以被setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)配置选项FluxAggregatorMessageHandler.
例如,如果我们希望有列表在最后一条消息中,我们可以配置Flux.collectList()像下面这样:
fluxAggregatorMessageHandler.setCombineFunction(
(messageFlux) ->
messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new));
在FluxAggregatorMessageHandler选择合适的窗口策略:
-
setBoundaryTrigger(谓词<消息<?>>)- 传播到Flux.windowUntil()算子。 更多信息请参见其JavaDocs。 它优先于所有其他窗户选项。 -
setWindowSize(int)和setWindowSizeFunction(Function<Message<?>, Integer>)- 传播到Flux.window(int)或windowTimeout(int, Duration). 默认情况下,窗口大小是从组中的第一个消息及其IntegrationMessageHeaderAccessor.SEQUENCE_SIZE页眉。 -
setWindowTimespan(Duration)- 传播到通量窗口(持续时间)或windowTimeout(int, Duration)这取决于窗口大小的配置。 -
setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)- 一个函数,用于对任何未被暴露选项覆盖的自定义窗口作应用到分组通量中的变换。
由于该分量是消息处理器实现它可以简单地用作@Bean与@ServiceActivator消息注释。
通过Java DSL,它可以从中使用.handle()EIP方法。
下面的示例展示了我们如何注册一个集成流程运行时以及如何FluxAggregatorMessageHandler可以与上游的分流器相关联:
IntegrationFlow fluxFlow =
(flow) -> flow
.split()
.channel(MessageChannels.flux())
.handle(new FluxAggregatorMessageHandler());
IntegrationFlowContext.IntegrationFlowRegistration registration =
this.integrationFlowContext.registration(fluxFlow)
.register();
Flux<Message<?>> window =
registration.getMessagingTemplate()
.convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);
重序器
重序器与聚合器相关,但功能不同。 聚合器合并消息时,重序器通过消息而不更改消息。
功能性
重序列器的工作方式类似于聚合器,因为它使用CORRELATION_ID用于将消息分组存储。
区别在于重序器不以任何方式处理这些消息。
相反,它会按他们的顺序释放它们SEQUENCE_NUMBER标头值。
关于这一点,你可以选择一次性释放所有消息(在整个序列结束后,根据SEQUENCE_SIZE以及其他可能性)或在有有效序列出现时。
(我们在本章后面会介绍“有效序列”的含义。)
| 重序器旨在重排序具有较小间隙的相对较短的消息序列。 如果你有大量不相交且间隙多的序列,可能会遇到性能问题。 |
配置重序器
关于在Java DSL中配置重序列器,请参见聚合器和重序列器。
配置重序器只需在XML中包含相应元素即可。
以下示例展示了重序器的配置:
<int:channel id="inputChannel"/>
<int:channel id="outputChannel"/>
<int:resequencer id="completelyDefinedResequencer" (1)
input-channel="inputChannel" (2)
output-channel="outputChannel" (3)
discard-channel="discardChannel" (4)
release-partial-sequences="true" (5)
message-store="messageStore" (6)
send-partial-result-on-expiry="true" (7)
send-timeout="86420000" (8)
correlation-strategy="correlationStrategyBean" (9)
correlation-strategy-method="correlate" (10)
correlation-strategy-expression="headers['something']" (11)
release-strategy="releaseStrategyBean" (12)
release-strategy-method="release" (13)
release-strategy-expression="size() == 10" (14)
empty-group-min-timeout="60000" (15)
lock-registry="lockRegistry" (16)
group-timeout="60000" (17)
group-timeout-expression="size() ge 2 ? 100 : -1" (18)
scheduler="taskScheduler" /> (19)
expire-group-upon-timeout="false" /> (20)
| 1 | 重序列器的ID是可选的。 |
| 2 | 重序器的输入通道。 必填。 |
| 3 | 重序器发送重排序消息的通道。 自选。 |
| 4 | 重序器发送超时消息的通道(如果超时时发送部分结果设置为false).
自选。 |
| 5 | 是要在有序序列可用时立即发送,还是等整个消息组都到齐后再发送。
自选。
(默认为false.) |
| 6 | 对MessageGroupStore该功能可用于在其关联键下存储消息组,直到消息完整。
自选。
(默认为易失内存存储。) |
| 7 | 在组到期后,是否应该发送该有序组(即使部分消息缺失)。
自选。
(默认为假。)
看聚合器中的状态管理:MessageGroupStore. |
| 8 | 发送回复时等待的超时间隔消息前往输出通道或丢弃通道.
默认-1,该会无限期地被阻挡。
只有当输出通道存在某些“发送”限制时,才会应用,例如:队列通道但有固定的“容量”。
在这种情况下,一个MessageDeliveryException被抛出。
这发送超时忽略了摘要订阅频道实现。
为group-timeout(-表达式)这MessageDeliveryException从计划到期任务中,该任务被重新调度。
自选。 |
| 9 | 引用一个实现消息相关(分组)算法的豆子。
豆子可以是相关策略界面或POJO。
在后一种情况下,相关策略方法属性也必须被定义。
自选。
(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.CORRELATION_ID标题。) |
| 10 | 定义在由相关策略并且实现了相关决策算法。
可选,但有限制(要求)相关策略要在场)。 |
| 11 | 一个表示相关策略的SpEL表达式。
例:“headers['something']”.
只有其中一个相关策略或相关-策略-表达是允许的。 |
| 12 | 指的是实现释放策略的豆子。
豆子可以是发布策略界面或POJO。
在后一种情况下,发布策略方法属性也必须被定义。
可选(默认情况下,聚合器会使用IntegrationMessageHeaderAccessor.SEQUENCE_SIZE标题属性)。 |
| 13 | 定义在由释放策略而这实现了完备决策算法。
可选,但有限制(要求)释放策略要在场)。 |
| 14 | 一个代表发布策略的 SpEL 表达式。
表达式的根对象是消息组.
例:“尺寸()== 5”.
只有其中一个释放策略或释放-策略-表达是允许的。 |
| 15 | 仅当MessageGroupStoreReaper配置为<重序列器> 消息商店.
默认情况下,当MessageGroupStoreReaper配置为部分组的过期,空组也会被移除。
空组在正常释放后仍存在。
这是为了能够检测并丢弃迟到的消息。
如果你希望空组的过期时间比部分组过期更长,可以设置此属性。
空组则不会从中移除消息商店直到至少在这毫秒内未被修改。
注意,空组的实际到期时间也受收割者的超时属性影响,可能高达该值加上超时。 |
| 16 | 参见“配置 XML 聚合器”。 |
| 17 | 参见“配置 XML 聚合器”。 |
| 18 | 参见“配置 XML 聚合器”。 |
| 19 | 参见“配置 XML 聚合器”。 |
| 20 | 默认情况下,当一个组因超时(或MessageGroupStoreReaper),空组的元数据被保留。
迟到的消息会立即被丢弃。
将此设为true彻底移除该群体。
然后,迟到的消息会启动一个新组,直到组再次超时前才会被丢弃。
由于序列范围存在“漏洞”导致超时,新组从未正常释放。
空组可以通过MessageGroupStoreReaper与空组最小超时属性。
从5.0版本开始,空组也会在空组最小超时流逝。
默认是“假”。 |
更多信息请参见聚合器过期组。
| 由于 Java 类中没有可实现的自定义行为,因此没有注释支持。 |
消息处理链
这消息处理链是 的实现消息处理器它可以配置为单一消息端点,同时实际上委派给一系列其他处理程序,如Filter、变换器、分线器等。
当多个处理程序需要以固定的线性进程连接时,这会导致配置更加简单。
例如,通常在其他元件之前先提供转换器。
同样,当你在链中的其他组件之前提供过滤器时,实际上就是创造了一个选择性消费者。
无论哪种情况,链条只需一个输入通道以及一首单曲输出通道,消除了为每个组件定义通道的需求。
这消息处理链主要为XML配置设计。
对于Java DSL,一个集成流程定义可以被视为链式组件,但它与本章所述的概念和原则无关。
更多信息请参见Java DSL。 |
春季集成Filter提供一个布尔属性:throwExceptionOnRejection.
当你在同一点对点信道上为多个选择性消费者提供不同的接受标准时,应将该值设置为“true”(默认为false这样调度员就知道消息被拒绝了,因此会尝试将消息传递给其他订阅者。
如果没有抛出异常,调度器会认为消息已成功传递,尽管过滤器已丢弃该消息以防止进一步处理。
如果你确实想“丢弃”消息,过滤器的“丢弃通道”可能有用,因为它给你机会对丢弃消息执行某些作(比如发送到JMS队列或写入日志)。 |
处理链简化了配置,同时内部保持组件间的松耦合程度,如果需要非线性排列,修改配置也变得简单。
内部链被扩展为列出的端点线性排列,中间由匿名通道分隔。
响应信道头部在链中不被考虑。
只有在最后一个处理器被调用后,生成的消息才会被转发到回复信道或链的输出信道。
由于这种设置,除最后一个处理程序外,所有处理程序都必须实现消息制作人接口(提供一种“setOutputChannel()”方法)。
如果输出通道在消息处理链是设置的,最后一个处理器只需要一个输出通道。
与其他端点一样,输出通道是可选的。
如果链条末端有回复消息,输出通道优先。
然而,如果无法使用该信道头,链处理程序会检查入站消息的回复信道头作为备援。 |
在大多数情况下,你不必实现消息处理器你自己。
下一节重点介绍链元素的命名空间支持。
大多数 Spring Integration 端点,如服务激活器和变换器,都适合在消息处理链.
链的配置
这<链>元素提供输入通道属性。
如果链中的最后一个元素能够生成回复消息(可选),它也支持输出通道属性。
子元件包括Filter、转换器、分配器和服务激活器。
最后一个元件也可以是路由器或出站通道适配器。
以下示例展示了链式定义:
<int:chain input-channel="input" output-channel="output">
<int:filter ref="someSelector" throw-exception-on-rejection="true"/>
<int:header-enricher>
<int:header name="thing1" value="thing2"/>
</int:header-enricher>
<int:service-activator ref="someService" method="someMethod"/>
</int:chain>
这<header-enricher>前述示例中使用的元素设置一个名为东西1其值为东西2在留言中。
头部增益器是 的一种专门化转换器只涉及头部值。
你可以通过实现消息处理器它做了排气头的修改和接线,但加热喷头增强器是更简单的选择。
这<链>可以配置为消息流的最后一个“封闭盒”用户。
对于这个解决方案,你可以把它放在<链末端>某个<出站通道适配器>,如下示例所示:
<int:chain input-channel="input">
<int-xml:marshalling-transformer marshaller="marshaller" result-type="StringResult" />
<int:service-activator ref="someService" method="someMethod"/>
<int:header-enricher>
<int:header name="thing1" value="thing2"/>
</int:header-enricher>
<int:logging-channel-adapter level="INFO" log-full-message="true"/>
</int:chain>
|
禁止的属性和元素
某些属性,例如 对于 Spring Integration 的核心组件,XML 模式本身就强制执行了这些约束。 然而,对于非核心组件或你自己的自定义组件,这些约束是由XML命名空间解析器强制执行,而非XML模式。 这些XML命名空间解析器约束是在Spring Integration 2.2中添加的。
如果你尝试使用不允许的属性和元素,XML 命名空间解析器会抛出 |
使用 'id' 属性
从 Spring Integration 3.0 开始,如果给定链元素身份证属性,元素的豆名是链条的组合身份证以及身份证元素本身。
元素身份证属性不被注册为豆子,但每个属性都被赋予组件名称这包括链条身份证.
请考虑以下例子:
<int:chain id="somethingChain" input-channel="input">
<int:service-activator id="somethingService" ref="someService" method="someMethod"/>
<int:object-to-json-transformer/>
</int:chain>
在前面的例子中:
-
这
<链>根元素具有身份证“某物链”的。 因此,摘要终点实现(民调消费者或EventDrivenConsumer,根据输入通道类型)豆子将此值作为豆子名称。 -
这
消息处理链豆子获得了豆子别名(“somethingChain.handler”),允许从豆子工厂. -
这
<服务激活器>它不是一个完整的消息终端(它不是民调消费者或EventDrivenConsumer). 它是消息处理器在<链>. 在这种情况下,注册在豆子工厂是“某某Chain$child.somethingService.handler”。 -
这
组件名称这件事服务激活处理器取值相同,但不带“.handler”后缀。 它变成了“somethingChain$child.somethingService”。 -
最后
<链>子组件,<object-to-json-transformer>,没有身份证属性。 其组件名称基于其在<链>. 在这种情况下,是“somethingChain$child#1”。 (名称的最后一个元素是链条中的顺序,以“#0”开头)。 注意,该变换器在应用上下文中未被注册为豆子,因此不会获得豆名. 然而,它组件名称具有对日志记录及其他用途有用的值。
提供明确的身份证属性<链>元素简化日志中子组件的识别,并提供从豆子工厂等。 |
从链中调用链
有时,你需要在链中对另一条链进行嵌套调用,然后返回继续在原链内执行。 为此,你可以通过包含<gateway>元素来使用消息网关,如下示例所示:
<int:chain id="main-chain" input-channel="in" output-channel="out">
<int:header-enricher>
<int:header name="name" value="Many" />
</int:header-enricher>
<int:service-activator>
<bean class="org.foo.SampleService" />
</int:service-activator>
<int:gateway request-channel="inputA"/>
</int:chain>
<int:chain id="nested-chain-a" input-channel="inputA">
<int:header-enricher>
<int:header name="name" value="Moe" />
</int:header-enricher>
<int:gateway request-channel="inputB"/>
<int:service-activator>
<bean class="org.foo.SampleService" />
</int:service-activator>
</int:chain>
<int:chain id="nested-chain-b" input-channel="inputB">
<int:header-enricher>
<int:header name="name" value="Jack" />
</int:header-enricher>
<int:service-activator>
<bean class="org.foo.SampleService" />
</int:service-activator>
</int:chain>
在上述例子中,嵌套链-A在 的结尾称为主链由配置好的“网关”元素进行处理。
在嵌套链-A,呼唤嵌套链-b是在header enrichment之后完成的。
然后流程回来完成执行嵌套链-b.
最后,流程回到主链.
当 a 的嵌套版本时<网关>元素在链中定义,不需要服务接口属性。
相反,它会将消息当前状态放入定义的信道上请求信道属性。
当该网关发起的下游流完成时,a消息被送回传送门,继续在当前链条中的旅程。
散布-聚集
从4.1版本开始,Spring Integration提供了散点-收集企业集成模式的实现。 它是一个复合端点,目标是向接收者发送消息并汇总结果。 正如企业集成模式中所述,它是“最佳报价”等场景的组成部分,需要向多个提供商请求信息,并决定哪一家能为所请求的项目提供最佳术语。
此前,图案可以通过使用离散组件进行配置。 这种增强带来了更便捷的配置。
这散布收集处理器是一个请求-回复端点,结合了发布订阅频道(或收件人列表路由器) 以及聚合消息处理器.
请求消息会发送到散射通道,以及散布收集处理器等待聚合器发送给输出通道.
功能性
这散布-聚集模式暗示两种情景:“拍卖”和“分配”。
在这两种情况下,集合体功能相同,提供所有可用的选项聚合消息处理器.
(实际上,散布收集处理器只需一个聚合消息处理器作为构造子论证。)
更多信息请参见聚合器。
拍卖
拍卖散布-聚集变体使用“发布-订阅”逻辑处理请求消息,其中“散射”信道为发布订阅频道跟apply-sequence=“true”.
然而,这个通道可以是任意的消息频道实现(如同请求信道在内容丰富——参见内容丰富剂)。
不过,在这种情况下,你应该自己创建一个自定义相关性策略对于集合体功能。
分配
分布散布-聚集变体基于收件人列表路由器(参见收件人列表路由器)并包含所有可用的选项收件人列表路由器.
这是第二个散布收集处理器构造者论证。
如果你只想依赖默认相关性策略对于收件人列表路由器以及聚合,你应该具体说明apply-sequence=“true”.
否则,你应该提供定制服务相关性策略对于聚合.
与发布订阅频道变体(拍卖变体),具有收件人列表路由器 选择器选项可以根据消息筛选目标提供商。
跟apply-sequence=“true”,默认序列大小是提供的,并且聚合能够正确释放该组。
分发选项与拍卖选项是互斥的。
这applySequence=true仅用于基于ScatterGatherHandler(MessageHandler散布器,MessageHandler收集器)构造器配置,因为框架不能变异外部提供的组件。
为方便起见,XML 和 Java DSL 为散布-聚集集应用序列从6.0版本开始,真理。 |
对于拍卖和分发两种变体,请求(散布)消息都被丰富了gatherResultChannel以 头部等待来自聚合.
默认情况下,所有提供商都应将结果发送给回复频道头部(通常通过省略输出通道从终极终点开始)。
然而,gatherChannel还提供一个选项,允许提供商将回复发送到该渠道进行聚合。
配置散点收集端点
以下示例展示了 bean 定义的 Java 配置散布-聚集:
@Bean
public MessageHandler distributor() {
RecipientListRouter router = new RecipientListRouter();
router.setApplySequence(true);
router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
distributionChannel3()));
return router;
}
@Bean
public MessageHandler gatherer() {
return new AggregatingMessageHandler(
new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
new SimpleMessageStore(),
new HeaderAttributeCorrelationStrategy(
IntegrationMessageHeaderAccessor.CORRELATION_ID),
new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}
@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
handler.setOutputChannel(output());
return handler;
}
在前面的例子中,我们配置了收件人列表路由器 分配器豆子applySequence=“true”以及接收频道列表。
下一颗豆子是给聚合消息处理器.
最后,我们把这两颗豆子都注射进散布收集处理器豆子定义并标记为@ServiceActivator将散收集组件接入积分流。
以下示例展示了如何配置<散开聚集的声音>通过使用 XML 命名空间实现端点:
<scatter-gather
id="" (1)
auto-startup="" (2)
input-channel="" (3)
output-channel="" (4)
scatter-channel="" (5)
gather-channel="" (6)
order="" (7)
phase="" (8)
send-timeout="" (9)
gather-timeout="" (10)
requires-reply="" > (11)
<scatterer/> (12)
<gatherer/> (13)
</scatter-gather>
| 1 | 端点的ID。
这散布收集处理器Bean的别名为id + '.handler'.
这收件人列表路由器Bean的别名为id + '.scatterer'.
这AggregatingMessageHandler'bean注册的别名为“id + '.gatherer”.
自选。
(豆子工厂生成违约身份证价值。) |
| 2 | 生命周期属性指示终端是否应在应用上下文初始化时启动。
此外,散布收集处理器其他实现生命周期开始又停止gatherEndpoint,如果 a ,则内部生成收集通道提供。
自选。
(默认为true.) |
| 3 | 接收请求消息的通道,以处理这些请求散布收集处理器.
必填。 |
| 4 | 所指向的通道散布收集处理器发送聚合结果。
自选。
(收到的消息可以在回复频道消息头部)。 |
| 5 | 拍卖场景中散点消息的发送渠道。
自选。
与<散布者>子元素。 |
| 6 | 接收来自每个提供商回复的渠道。
它被用作回复频道散点消息中的头部。
自选。
默认情况下,固定订阅频道被创造出来。 |
| 7 | 当多个处理器订阅同一组件时,该组件的顺序直达频道(用于负载均衡目的)。
自选。 |
| 8 | 指定端点应在哪个阶段开始和停止。
启动订单从低到高依序,关闭订单从高到低。
默认情况下,这个值为Integer.MAX价值,意味着该容器尽可能晚开始,尽快停止。
自选。 |
| 9 | 发送回复时等待的超时间隔消息前往输出通道.
默认情况下,发送()阻挡了一秒钟。
只有当输出通道存在某些“发送”限制时,它才适用——例如,队列通道拥有固定的“容量”,且容量是满的。
在这种情况下,一个MessageDeliveryException被抛出。
这发送超时忽略了摘要订阅频道实现。
为group-timeout(-表达式)这MessageDeliveryException从计划到期任务中,该任务被重新调度。
自选。 |
| 10 | 它允许你指定散布收集等待回复消息的时间,然后返回。
默认情况下,它会无限期等待。
如果回复超时,则返回“null”。
自选。
它默认为-1,意为无限期等待。 |
| 11 | 指定散点采集是否必须返回非空值。
该值为true默认。
因此,一个回复必需例外当底层聚合器在 之后返回空值时,会被抛出集合超时.
注意,如果零是一种可能性,且集合超时应明确说明以避免无限期等待。 |
| 12 | 这<收件人列表路由器>选项。
自选。
互斥的散射通道属性。 |
| 13 | 这<聚合器>选项。
必填。 |
错误处理
由于散点收集是多请求-回复组件,错误处理增加了一些复杂性。
在某些情况下,如果发布策略这样流程结束时回复数量少于请求数。
在其他情况下,当发生错误时,应考虑发送类似“补偿消息”的返回。
每个异步子流都应配置为errorChannel用于发送正确错误消息的头部消息发布错误处理.
否则,错误将发送给全局errorChannel采用常见的错误处理逻辑。
有关异步错误处理的更多信息,请参见错误处理。
同步流可能使用ExpressionEvaluatingRequestHandlerAdvice因为忽视异常或返回补偿消息。
当某个子流抛出异常时散布收集处理器,它只是被重新抛向上游。
这样其他子流就不会无效,他们的回复也会被忽略散布收集处理器.
这有时可能是预期行为,但在大多数情况下,处理特定子流的错误时不影响其他所有子流和采集器中的期望会更好。
从5.1.3版本开始,散布收集处理器配备errorChannelName选择。
其人口分布为errorChannel散点消息的头部,用于发生异步错误时,或在常规同步子流中直接发送错误消息。
下面的示例配置通过返回补偿消息演示了异步错误处理:
@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
return f -> f
.scatterGather(
scatterer -> scatterer
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
.transform(p -> {
throw new RuntimeException("Sub-flow#2");
})),
null,
s -> s.errorChannel("scatterGatherErrorChannel"));
}
@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
return MessageBuilder.withPayload(payload.getCause().getCause())
.copyHeaders(payload.getFailedMessage().getHeaders())
.build();
}
为了产生正确的回复,我们必须复制头部(包括回复频道和errorChannel)失败消息关于消息异常该文件已发送至散点收集错误频道由消息发布错误处理.
这样,目标异常会返回给散布收集处理器回复消息组完成。
真是个例外有效载荷可以在消息组处理器收集者或在散收集端点之后的下游处理。
在将散射结果发送给采集者之前,散布收集处理器恢复请求消息头部,包括回复信道和错误信道(如有)。
这样,错误来自聚合消息处理器即使在散布接收者子流中应用异步切换,也会传播给呼叫者。
为了成功作,agatherResultChannel,原始回复频道和原版ErrorChannel(原版ErrorChannel)头部必须从散布接收者子流中返回回复。
在这种情况下,一个合理的有限gatherTimeout必须配置为散布收集处理器.
否则,默认情况下会被永久屏蔽,等待采集者的回复。 |
线障
有时,我们需要暂停消息流线程,直到发生其他异步事件。 例如,考虑一个向RabbitMQ发布消息的HTTP请求。 我们可能希望在RabbitMQ经纪人收到消息确认之前不回复用户。
在4.2版本中,Spring Integration引入了<屏障/>组件用于此目的。
基础消息处理器是障碍信息处理者.
该类还实现了消息触发动作,其中一条消息传递给触发()方法释放对应的线程handleRequestMessage()方法(如果存在的话)。
悬挂线程和触发线程通过调用相关策略在消息里。
当消息发送到输入通道,线程悬挂至最多请求超时毫秒,等待相应的触发信息。
默认相关策略使用IntegrationMessageHeaderAccessor.CORRELATION_ID页眉。
当触发消息与其相关性相同时,线程被释放。
发送给输出通道释放后通过使用消息组处理器.
默认情况下,消息是收藏<?>两个有效载荷中,头部通过DefaultAggregatingMessageGroupProcessor.
如果触发()方法首先被调用(或主线程超时后),它会被暂停,时间最多为触发超时等待暂停信息的到来。
如果你不想暂停触发线程,可以考虑把它交给任务执行者而是让其螺丝悬浮。 |
在之前的5.4版本中,只有一个超时请求消息和触发消息都有选项,但在某些使用场景中,这些动作最好设置不同的超时时间。
因此请求超时和触发超时已经引入了多种选项。 |
这需要回复属性决定了如果暂停线程在触发消息到达前超时,应采取的作。
默认情况下,它是false,这意味着端点返回零,流结束,线程返回调用者。
什么时候true一个回复必需例外被抛出。
你可以打电话给触发()程序化方法(通过使用名称获取豆子引用,障碍。处理者——其中障碍是势垒端点的豆名)。
或者,你可以配置一个<出站通道适配器/>触发释放。
| 只有一个线程可以被悬挂,且相关性相同。 相同的相关性可以多次使用,但只能同时使用一次。 如果第二个线程到达且具有相同相关性,则抛出异常。 |
以下示例展示了如何使用自定义头部进行关联:
@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
barrier.setOutputChannel(out());
barrier.setDiscardChannel(lateTriggerChannel);
return barrier;
}
@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
correlation-strategy-expression="headers['myHeader']"
output-processor="myOutputProcessor"
discard-channel="lateTriggerChannel"
timeout="10000">
</int:barrier>
<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />
根据哪个节点先收到消息,要么是发送消息的线程在或者发送消息的线程释放等待最多十秒,直到另一条消息到达。
当消息被释放时,外通道会发送一条消息,结合调用自定义结果消息组处理器比恩,名字我的输出处理器.
如果主线程超时且触发器较晚到达,你可以配置一个丢弃通道,延迟触发器被发送到那里。
关于该组件的示例,请参见屏障 Samples的应用。