|
该版本仍在开发中,尚未被视为稳定。对于最新的稳定版本,请使用 Spring Integration 7.0.0! |
聚合
基本上,聚合器是分路器的镜像,是一种消息处理程序,接收多个消息并将其合并为一条消息。 事实上,聚合器通常是包含分路器的管道中的下游消费者。
从技术上讲,聚合器比分路器更复杂,因为它是有状态的。
它必须保存待聚合的消息,并判断整组消息何时准备好聚合。
为此,需要消息商店.
功能性
聚合器通过关联和存储一组相关消息,直到该组被认为完整。 此时,聚合器通过处理整个组生成一条消息,并将聚合后的消息作为输出发送。
实现聚合器需要提供逻辑来执行聚合(即从多个消息中创建一个单一消息)。 两个相关的概念是相关性和释放。
相关性决定了消息如何分组以便聚合。
在春季积分中,相关性默认基于IntegrationMessageHeaderAccessor.CORRELATION_ID消息头部。
与同一条信息IntegrationMessageHeaderAccessor.CORRELATION_ID被归为一组。
不过,你可以自定义相关策略,允许其他方式指定消息的分组方式。
为此,你可以实现相关策略(本章后面将详细介绍)
为了确定一组消息何时准备好处理,需要发布策略被咨询。
聚合器的默认发布策略是在序列中包含的所有消息都存在时,基于IntegrationMessageHeaderAccessor.SEQUENCE_SIZE页眉。
你可以通过提供自定义的引用来覆盖这个默认策略发布策略实现。
编程模型
聚合API由若干类组成:
-
界面
消息组处理器,及其子类:MethodInvokingAggregatingMessageGroupProcessor和ExpressionEvaluatingMessageGroupProcessor -
这
发布策略界面及其默认实现:简单序列大小释放策略 -
这
相关策略界面及其默认实现:头部属性相关策略
聚合消息处理器
这聚合消息处理器(一个子类摘要相关消息处理)是消息处理器实现,封装聚合器的共同功能(及其他相关用例),具体如下:
-
将消息关联到待聚合的组中
-
在
消息商店直到该团体能够被释放 -
决定何时可以释放该小组
-
将释放的组合并成一条消息
-
识别并响应过期组
决定如何将消息分组的责任委托给了相关策略实例。
决定消息组是否可以释放的责任被委托给发布策略实例。
以下列表展示了基地的简要亮点摘要聚合消息组处理器(实施聚合有效载荷方法由开发者自行决定):
public abstract class AbstractAggregatingMessageGroupProcessor
implements MessageGroupProcessor {
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
// default implementation exists
}
protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);
}
看DefaultAggregatingMessageGroupProcessor,ExpressionEvaluatingMessageGroupProcessor和MethodInvokingMessageGroupProcessor作为开箱即用的摘要聚合消息组处理器.
从版本5.2开始,Function<MessageGroup, Map<String, Object>>策略适用于摘要聚合消息组处理器用于合并并计算输出消息的(聚合)头部。
这默认聚合首部函数实现方式支持返回所有组内无冲突的头部;在组内一个或多个消息中缺少头部不被视为冲突。
冲突的头部被省略。
以及新引入的授权MessageGroupProcessor,该函数用于任意(非-摘要聚合消息组处理器) 消息组处理器实现。
本质上,该框架将提供的函数注入到摘要聚合消息组处理器实例,并将所有其他实现封装为授权MessageGroupProcessor.
逻辑上的差异摘要聚合消息组处理器以及授权MessageGroupProcessor后者在调用代理策略前不会提前计算头部,且如果代理返回 a消息或摘要集成信息构建器.
在这种情况下,框架假设目标实现已经生成了一组合适的头部,并填充到返回的结果中。
这Function<MessageGroup, Map<String, Object>>策略可作为标题函数XML 配置的参考属性,作为AggregatorSpec.headersFunction()Java DSL 的选项,以及AggregatorFactoryBean.setHeadersFunction()用于纯 Java 配置。
这相关策略属于摘要相关消息处理且 的默认值基于IntegrationMessageHeaderAccessor.CORRELATION_ID消息头,如下示例所示:
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
...
this.correlationStrategy = correlationStrategy == null ?
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
...
}
至于消息组的实际处理,默认实现是DefaultAggregatingMessageGroupProcessor.
它会创造一个单一消息其有效载荷为列表该组接收的有效载荷数量。
这对于简单的分散收集实现(如分路器、发布-订阅通道或上游接收者列表路由器)效果良好。
在使用发布-订阅频道或接收者列表路由器时,务必启用应用序列旗。
这样做会添加必要的头部:CORRELATION_ID,SEQUENCE_NUMBER和SEQUENCE_SIZE.
在 Spring Integration 中,分线器默认启用了该行为,但发布-订阅信道或接收者列表路由器则未启用,因为这些组件可能在多种情况下使用,这些头部并不必要。 |
在为应用实施特定的聚合策略时,你可以扩展摘要聚合消息组处理器并实现聚合有效载荷方法。
然而,有更好的解决方案,更少依赖于API,用于实现聚合逻辑,这些逻辑可以通过XML或注释进行配置。
一般来说,任何POJO如果提供一种接受单一java.util.List作为一个论元(参数化列表也被支持)。
该方法用于聚合消息,具体如下:
-
如果该参数是
java.util.Collection<T>参数类型T可分配给消息,所有为聚合而累积的消息列表都会发送给聚合器。 -
如果参数是非参数化的
java.util.Collection或者参数类型不可分配于消息,该方法接收累积消息的有效载荷。 -
如果返回类型不可分配于
消息,它被视为 的有效载荷消息这由框架自动创建。
| 为了简化代码并推广低耦合、可检验性等最佳实践,首选实现聚合逻辑的方式是通过 POJO 并在应用中使用 XML 或注释支持进行配置。 |
从5.3版本开始,处理消息组后,摘要相关消息处理执行MessageBuilder.popSequenceDetails()针对具有多个嵌套层级的适当分路器-聚合器场景进行消息头修改。
只有当消息组释放结果不是消息集合时才会这样做。
那就是目标消息组处理器负责MessageBuilder.popSequenceDetails()在建立这些信息的同时打电话。
如果消息组处理器返回 a消息一个MessageBuilder.popSequenceDetails()只有当序列细节匹配了群组中第一个消息。
(此前仅在普通有效载荷或摘要集成信息构建器已从中归还消息组处理器.)
该功能可以通过新的popSequence 布尔性质,因此MessageBuilder.popSequenceDetails()当标准分配器未填充相关细节时,可关闭。
这一特性本质上抵消了最近上游的做法应用序列 = 真在摘要消息分流器.
更多信息请参见Splitter。
这SimpleMessageGroup.getMessages()方法返回不可修改的集合.
因此,如果聚合POJO方法具有收藏<信息>参数,传递的参数正是收集实例和,当你使用一个简易消息商店对于聚合器来说,原始的收藏<信息>释放团队后被清除。
因此,收藏<信息>如果 POJO 中的变量是从聚合器传递出来的,也被清除。
如果你想直接发布该收藏以便进一步处理,必须重新构建一个新的收集(例如,新 ArrayList<Message>(messages)).
从4.3版本开始,框架不再将消息复制到新集合,以避免不必要的额外对象创建。 |
在4.2版本之前,无法提供消息组处理器通过使用 XML 配置。
只有POJO方法可用于聚合。
现在,如果框架检测到被引用的(或内部)豆实现了消息处理器它被用作聚合器的输出处理器。
如果你想从自定义中发布一组对象消息组处理器作为消息的有效载荷,你的类应扩展摘要聚合消息组处理器并实现聚合有效载荷().
此外,自4.2版本起,aSimpleMessageGroup处理器提供。
它返回来自该组的消息集合,如前所述,这会导致释放的消息被单独发送。
这使得聚合器能够作为消息屏障,将到达的消息保留,直到发布策略触发,并将该组作为一系列单独消息释放。
从6.0版本开始,上述拆分行为仅在组处理器为SimpleMessageGroup处理器.
否则,和其他任何角色消息组处理器返回收藏<信息>,只发出一条回复消息,所有消息集合作为有效载荷。
这种逻辑由聚合器的典型目的所决定——通过某个密钥收集请求消息并生成单一分组消息。
发布策略
这发布策略接口定义如下:
public interface ReleaseStrategy {
boolean canRelease(MessageGroup group);
}
一般来说,任何 POJO 只要提供一个接受单一java.util.List作为参数(也支持参数化列表),并返回一个布尔值。
该方法在每条新消息到达后调用,以判断该组是否完整,具体如下:
-
如果该参数是
java.util.List<T>以及参数类型T可分配为消息,组中累积的全部消息列表被发送到该方法。 -
如果参数是非参数化的
java.util.List或者参数类型不可分配于消息,该方法接收累积消息的有效载荷。 -
方法必须返回
true消息组是否准备好进行聚合,否则为false。
以下示例展示了如何使用@ReleaseStrategy注释列表类型消息:
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<Message<?>>) {...}
}
以下示例展示了如何使用@ReleaseStrategy注释列表类型字符串:
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<String>) {...}
}
基于前两个示例中的签名,基于POJO的释放策略通过收集尚未发布的消息(如果你需要访问全部内容)消息)或收集对于有效载荷对象(如果类型参数不是消息).
这满足了大多数使用场景。
不过,如果你因为某些原因需要访问完整版消息组你应该提供发布策略接口。
|
在处理可能规模较大的组时,你应了解这些方法是如何被调用的,因为释放策略可以在组被释放前多次调用。
最高效的是实现 基于这些原因,对于大型团队,我们建议实施 |
当该组被释放用于聚合时,所有尚未发布的消息都会被处理并从组中移除。
如果该群也是完备的(即所有来自序列的消息都已到达或没有定义序列),则该群被标记为完备。
该组的任何新消息都会发送到丢弃信道(如有定义)。
设置完成时过期组自true(默认为false) 移除整个组,任何与被移除组关联相同的新消息组成一个新组。
你可以通过使用MessageGroupStoreReaper䋰发送部分结果于到期时被设置为true.
为了方便丢弃迟到消息,聚合器必须在消息发布后保持关于该组的状态。
这最终可能导致内存不足的情况。
为了避免这种情况,你应该考虑配置一个MessageGroupStoreReaper以移除组元数据。
应设置到期参数,当到达某一点后,延迟消息预计不会到达时,组才会过期。
有关配置收割者的信息,请参见聚合器中的状态管理:MessageGroupStore. |
Spring Integration 提供了发布策略:简单序列大小释放策略.
本实现参考了SEQUENCE_NUMBER和SEQUENCE_SIZE每个到达消息的头部,用来决定消息组何时完成并准备聚合。
如前所述,这也是默认策略。
在5.0版本之前,默认发布策略是序列大小释放策略,但该方法在大群体中表现不佳。
通过这种策略,重复序列号被检测并拒绝。
这项手术可能花费不菲。 |
如果你在聚合大群,不需要释放部分群,也不需要检测/拒绝重复序列,可以考虑使用简单序列大小释放策略相反,在这些用例中,它效率更高,并且自5.0版本起默认使用,当部分组发布未被指定时。
聚合大型群体
4.3版本更改了默认设置收集对于 a 中的消息简易消息组自哈希集(之前是阻塞队列).
当从大组中移除单个消息时,这种方式成本较高(需要O(n)的线性扫描)。
虽然哈希集通常移除速度更快,但对于大型消息来说可能成本较高,因为哈希值必须在插入和移除时都计算。
如果你有需要哈希的消息,可以考虑使用其他类型的集合。
如同相关内容用消息组工厂一个简易消息组工厂提供给你选择收集这最适合你的需求。
你也可以提供自己的工厂实现来创建其他收藏<消息<?>>.
以下示例展示了如何配置聚合器,使用之前的实现和简单序列大小释放策略:
<int:aggregator input-channel="aggregate"
output-channel="out" message-store="store" release-strategy="releaser" />
<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
<property name="messageGroupFactory">
<bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
<constructor-arg value="BLOCKING_QUEUE"/>
</bean>
</property>
</bean>
<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果Filter端点涉及聚合器上游的流,序列大小释放策略(固定或基于序列大小头部)无法发挥其作用,因为序列中的某些消息可能会被过滤器丢弃。
在这种情况下,建议选择其他方案发布策略,或者使用从丢弃子流发送的补偿消息,其中包含内容中某些信息,在自定义完整群函数中可跳过。
更多信息请参见筛选。 |
相关性策略
这相关策略接口定义如下:
public interface CorrelationStrategy {
Object getCorrelationKey(Message<?> message);
}
该方法返回对象代表用于将消息与消息组关联的关联键。
密钥必须满足用于密钥的条件地图关于等值()和hashCode().
一般来说,任何POJO都可以实现相关逻辑,且将消息映射到方法参数(或多个参数)的规则与服务激活器(包括支持@Header注释)。
方法必须返回一个值,且该值不得为零.
Spring Integration 提供了相关策略:头部属性相关策略.
该实现返回其中一个消息头部(其名称由构造函数参数指定)作为关联键。
默认情况下,相关策略为头部属性相关策略返回 的值CORRELATION_ID标题属性。
如果你有自定义的头部名称想用于关联,可以在头部属性相关策略并以此作为聚合器相关策略的参考。
锁注册处
对组的更改是线程安全的。
所以,当你同时发送同一相关ID的消息时,聚合器中只会处理其中一个消息,实际上是单线程的每条消息组。
一个锁注册用于获得已解析相关ID的锁。
一个默认锁注册表默认使用(内存内)。
用于在服务器间同步更新,共享MessageGroupStore如果正在使用,你必须配置共享锁注册表。
避免僵局
如上所述,当消息组发生变异(消息添加或释放)时,会被保留锁。
考虑以程:
...->aggregator1-> ... ->aggregator2-> ...
如果有多个线程,且聚合器共享一个共同锁注册表,就有可能出现死锁。
这会导致挂线和jstack <PID>可能得到如下结果:
Found one Java-level deadlock:
=============================
"t2":
waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t1"
"t1":
waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t2"
有几种方法可以避免这个问题:
-
确保每个聚合器拥有自己的锁注册表(锁可以是跨应用实例共享的注册表,但流程中的两个或多个聚合器必须各自拥有独立的注册表)
-
使用一个
执行者频道或队列通道作为聚合器的输出通道,使下游流运行在新的线程上 -
从5.1.1版本开始,设置
releaseLockBeforeSend聚合器属性 到true
| 如果某个聚合器的输出最终被路由回同一个聚合器,也可能导致这个问题。 当然,上述第一个解决方案在这种情况下不适用。 |
在Java DSL中配置聚合器
请参阅聚合器和重序列器,了解如何在 Java DSL 中配置聚合器。
使用 XML 配置聚合器
Spring 集成支持通过<聚合器/>元素。
以下示例展示了聚合器的示例:
<channel id="inputChannel"/>
<int:aggregator id="myAggregator" (1)
auto-startup="true" (2)
input-channel="inputChannel" (3)
output-channel="outputChannel" (4)
discard-channel="throwAwayChannel" (5)
message-store="persistentMessageStore" (6)
order="1" (7)
send-partial-result-on-expiry="false" (8)
send-timeout="1000" (9)
correlation-strategy="correlationStrategyBean" (10)
correlation-strategy-method="correlate" (11)
correlation-strategy-expression="headers['foo']" (12)
ref="aggregatorBean" (13)
method="aggregate" (14)
release-strategy="releaseStrategyBean" (15)
release-strategy-method="release" (16)
release-strategy-expression="size() == 5" (17)
expire-groups-upon-completion="false" (18)
empty-group-min-timeout="60000" (19)
lock-registry="lockRegistry" (20)
group-timeout="60000" (21)
group-timeout-expression="size() ge 2 ? 100 : -1" (22)
expire-groups-upon-timeout="true" (23)
scheduler="taskScheduler" > (24)
<expire-transactional/> (25)
<expire-advice-chain/> (26)
</aggregator>
<int:channel id="outputChannel"/>
<int:channel id="throwAwayChannel"/>
<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
<constructor-arg ref="dataSource"/>
</bean>
<bean id="aggregatorBean" class="sample.PojoAggregator"/>
<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>
<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
| 1 | 聚合器的ID是可选的。 |
| 2 | 生命周期属性指示聚合器是否应在应用上下文启动时启动。 可选(默认为“true”)。 |
| 3 | 聚合器接收消息的通道。 必填。 |
| 4 | 聚合器发送聚合结果的通道。 可选(因为收到的消息可以在“replyChannel”消息头中指定回复通道)。 |
| 5 | 聚合器发送超时消息的信道(如果发送部分结果于到期时是false).
自选。 |
| 6 | 对MessageGroupStore用于在其关联键下存储消息组,直到它们完成。
自选。
默认情况下,它是一个挥发性内存存储。
更多信息请参见消息存储。 |
| 7 | 当多个句柄订阅同一句柄时,该聚合器的顺序直达频道(用于负载均衡目的)。
自选。 |
| 8 | 表示过期消息应被汇总并发送到包含的“输出通道”或“回复通道”消息组已过期(参见MessageGroupStore.expireMessageGroups(long)).
一种过期方式消息组是通过配置MessageGroupStoreReaper.
不过,你也可以选择过期消息组通过呼叫MessageGroupStore.expireMessageGroups(timeout).
你可以通过控制总线作实现这一点,或者如果你有对MessageGroupStore实例,通过调用expireMessageGroups(timeout).
否则,单独这个属性就没有任何作用。
它仅作为指示,判断是否丢弃或发送仍处于消息组那条快到期了。
可选(默认为false).
注意:这个属性可能更准确地说是超时时发送部分结果,因为该群实际上可能不会失效,如果超时过期的分组设置为false. |
| 9 | 发送回复时等待的超时间隔消息前往输出通道或丢弃通道.
默认30秒。
只有当输出通道存在某些“发送”限制时,才会应用,例如:队列通道但有固定的“容量”。
在这种情况下,一个MessageDeliveryException被抛出。
为摘要订阅频道实现,发送超时被忽略。
为group-timeout(-表达式)这MessageDeliveryException从计划中的过期任务开始,该任务被重新安排。
自选。 |
| 10 | 引用一个实现消息相关(分组)算法的豆子。
豆子可以是相关策略界面或POJO。
在后一种情况下,相关策略方法属性也必须被定义。
可选(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.CORRELATION_ID页首)。 |
| 11 | 定义在 由相关策略.
它实现了相关决策算法。
可选,但有限制(相关策略必须存在)。 |
| 12 | 一个表示相关策略的SpEL表达式。
例:“headers['something']”.
只有其中一个相关策略或相关-策略-表达是允许的。 |
| 13 | 在应用上下文中定义的豆子的引用。 豆子必须实现如前所述的聚合逻辑。 可选(默认情况下,聚合消息列表成为输出消息的有效载荷)。 |
| 14 | 定义在由裁判属性。
它实现了消息聚合算法。
可选(这取决于情况裁判属性正在定义中)。 |
| 15 | 指的是实现释放策略的豆子。
豆子可以是发布策略界面或POJO。
在后一种情况下,发布策略方法属性也必须被定义。
可选(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.SEQUENCE_SIZE标题属性)。 |
| 16 | 定义在由释放策略属性。
它实现了完备判定算法。
可选,但有限制(释放策略必须存在)。 |
| 17 | 一个代表发布策略的 SpEL 表达式。
表达式的根对象是消息组.
例:“尺寸()== 5”.
只有其中一个释放策略或释放-策略-表达是允许的。 |
| 18 | 当设置为true(默认为false完成的组会从消息存储中移除,后续具有相同关联的消息则形成新的组。
默认行为是发送与完成组相对于丢弃通道. |
| 19 | 仅当MessageGroupStoreReaper配置为消息商店关于<聚合器>.
默认情况下,当MessageGroupStoreReaper配置为部分组的过期,空组也会被移除。
空组通常在释放组后出现。
空组用于检测和丢弃迟到消息。
如果你希望空组的过期时间比部分组过期更长,可以设置此属性。
空组则不会从中移除消息商店直到至少在这毫秒内未被修改。
注意,空组的实际到期时间也会受到收割者超时属性,可以是这个值加上超时。 |
| 20 | 对org.springframework.integration.util.LockRegistry豆。
它曾经获得锁基于组ID对于并行作消息组.
默认情况下,内部默认锁注册表被使用。
分布式的使用锁注册,例如Zookeeper 锁注册表确保聚合器只有一个实例可以同时在一个组上运行。
更多信息请参见Redis Lock Registry或Zookeeper Lock Registry。 |
| 21 | 超时(以毫秒计)来强制消息组当发布策略当前消息到达时不会释放该组。
该属性为聚合器提供了内置的时间基础释放策略,当需要发出部分结果(或丢弃该组)时,如果没有新的消息到达消息组在超时时间内,从最后一条消息到达开始算。
设置一个超时,从时间开始计算消息组被创建,见组-超时-表达式信息。
当新消息到达聚合器时,任何已有的消息ScheduledFuture<?>对于其消息组被取消了。
如果发布策略返回false(意为不放手)和小组暂停> 0,一个新任务被安排以终止该组。
我们不建议将该属性设为零(或为负值)。
这样做实际上会禁用聚合器,因为每个消息组都会立即完成。
不过,你可以通过使用表达式来有条件地将其设置为零(或负值)。
看组-超时-表达式获取信息。
完成过程中采取的动作取决于发布策略以及send-partial-group-on-expiry属性。
更多信息请参见聚合器和组超时。
它与“组-超时-表达式”属性互斥。 |
| 22 | SpEL表达式,使得小组暂停其中消息组作为#root评估上下文对象。
用于调度消息组被迫完成。
如果该表达式对应为零,完成过程未被安排。
如果计算为零,则该组立即在当前线程上完成。
实际上,这形成了一种动态小组暂停财产。
举个例子,如果你想强制完成一个消息组自组建以来10秒后,你可以考虑使用以下SpEL表达式:时间戳 + 10000 - T(System).currentTimeMillis()哪里时间戳由MessageGroup.getTimestamp()作为消息组这里是#root评估上下文对象。
但请注意,组的创建时间可能与第一个到达消息的时间不同,这取决于其他组到期属性的配置。
看小组暂停更多信息请见。
与“群体暂停”属性互斥。 |
| 23 | 当一个小组因超时(或MessageGroupStoreReaper),该组默认已过期(完全移除)。
迟到的消息会启动新群组。
你可以把它设置为false以完成该组,但其元数据保持不变,以便丢弃迟到的消息。
空组可以随后通过MessageGroupStoreReaper与空组最小超时属性。
默认为“true”。 |
| 24 | 一个任务调度器BEAN 引用以安排消息组如果没有新的消息到达,则强制完成消息组在小组暂停.
如果没有提供,默认调度器(任务调度器)注册于应用上下文 (ThreadPoolTaskScheduler)被使用。
该属性不适用于小组暂停或组-超时-表达式未具体说明。 |
| 25 | 自4.1版本起。
它允许对forceComplete(力道完成)操作。
它从一个group-timeout(-表达式)或者由MessageGroupStoreReaper且不应用于正规加,释放和丢弃操作。
只有该子元素或<过期-建议链/>是允许的。 |
| 26 | 自4.1版本起。
它允许配置任意建议对于forceComplete(力道完成)操作。
它从一个group-timeout(-表达式)或者由MessageGroupStoreReaper且不应用于正规加,释放和丢弃操作。
只有该子元素或<过期事务/交易>是允许的。
一笔交易建议也可以在这里使用Spring进行配置德州Namespace。 |
|
过期组别
关于过期(完全移除)组有两个属性。
当一个组过期时,不会有记录;如果新消息以相同相关性到达,则会启动一个新的组。
当一个组完成(无过期)时,空组依然存在,迟到的消息被丢弃。
空组可以通过后续使用
如果一组未正常完成,但因超时被释放或丢弃,通常该组会失效。
从4.1版本开始,你可以通过以下方式来控制这种行为
自5.0版本起,空组也会在之后被安排移除 从5.4版本开始,聚合器(和重排序器)可以配置为终止孤儿组(即存在于持久消息存储中可能不会发布的组)。
这 |
我们通常建议使用裁判属性,如果可以引用自定义聚合器处理程序的实现,则可在其他中引用<聚合器>定义。
然而,如果一个自定义聚合器实现仅由单一定义使用<聚合器>你可以使用内部豆定义(从版本1.0.3开始)在<聚合器>元素,如下例所示:
<aggregator input-channel="input" method="sum" output-channel="output">
<beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
同时使用裁判属性和内部豆子定义在同一个<聚合器>不允许配置,因为这会产生歧义。
在这种情况下,会抛出异常。 |
以下示例展示了聚合豆的实现:
public class PojoAggregator {
public Long add(List<Long> results) {
long total = 0l;
for (long partialResult: results) {
total += partialResult;
}
return total;
}
}
前述示例中完成策略豆的实现可能如下:
public class PojoReleaseStrategy {
...
public boolean canRelease(List<Long> numbers) {
int sum = 0;
for (long number: numbers) {
sum += number;
}
return sum >= maxValue;
}
}
| 在合适的地方,发布策略法和聚合法可以合并成一个整体。 |
上述示例中相关策略豆的实现可能如下:
public class PojoCorrelationStrategy {
...
public Long groupNumbersByLastDigit(Long number) {
return number % 10;
}
}
前述示例中的聚合器会按某种标准(此处为除以十后的剩余数)分组,并保持该组直到有效载荷提供的数字之和超过某个值。
| 在合适的地方,释放策略法、相关策略法和聚合法可以合并在一个豆子里。 (实际上,它们全部或任意两个都可以组合。) |
聚合器与 Spring 表达式语言(SpEL)
自 Spring Integration 2.0 起,你可以用 SpEL 处理各种策略(关联、发布和聚合),如果发布策略背后的逻辑相对简单,我们推荐采用这种方法。
假设你有一个遗留组件,设计用来接收一组对象。
我们知道默认发布策略将所有聚合消息汇聚于列表.
现在我们有两个问题。
首先,我们需要从列表中提取单独的消息。
其次,我们需要提取每个消息的有效载荷并组装对象数组。
以下示例解决了这两个问题:
public String[] processRelease(List<Message<String>> messages){
List<String> stringList = new ArrayList<String>();
for (Message<String> message : messages) {
stringList.add(message.getPayload());
}
return stringList.toArray(new String[]{});
}
然而,使用 SpEL,这种需求其实可以用一行表达式相对容易地处理,从而避免了写自定义类并将其配置为 bean 的过程。 以下示例展示了如何实现:
<int:aggregator input-channel="aggChannel"
output-channel="replyChannel"
expression="#this.![payload].toArray()"/>
在之前的配置中,我们使用集合投影表达式从列表中所有消息的有效载荷中组装出一个新的集合,然后将其转换为数组,从而实现了与之前 Java 代码相同的结果。
在处理自定义释放和相关策略时,也可以采用同样的基于表达式的方法。
而不是为习俗定义豆子相关策略在相关策略属性,你可以将简单的相关逻辑实现为 SpEL 表达式,并在相关-策略-表达属性,如下例所示:
correlation-strategy-expression="payload.person.id"
在前述例子中,我们假设有效载荷具有人属性身份证,将用于关联消息。
同理,对于发布策略你可以将发布逻辑实现为 SpEL 表达式,并在释放-策略-表达属性。
评估上下文的根对象是消息组本身。
这列表可以通过以下方式引用 。消息表达式中群的属性。
在5.0版本之前的版本中,根对象是留言<?>,如前例所示: |
release-strategy-expression="!messages.?[payload==5].empty"
在上述示例中,SpEL评估上下文的根对象是消息组你说,一旦出现带有有效载荷的消息5在这个群体中,应该释放这个群体。
聚合器与组超时
从4.0版本开始,引入了两个互斥属性:小组暂停和组-超时-表达式.
参见“配置 XML 聚合器”。
在某些情况下,如果发布策略当前消息到达时不会释放。
为此,小组暂停选项:调度消息组被迫完成,如下示例所示:
<aggregator input-channel="input" output-channel="output"
send-partial-result-on-expiry="true"
group-timeout-expression="size() ge 2 ? 10000 : -1"
release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>
在此示例中,如果聚合器收到了按顺序定义的最后一条消息,则正常释放是可能的释放-策略-表达.
如果该消息没有到达,小组暂停只要该组至少包含两个消息,则强制该组在十秒后完成。
强制该群完成的结果取决于发布策略以及发送部分结果于到期时.
首先,再次咨询释放策略,以确定是否需要进行正常释放。
虽然该组织未曾改变,发布策略目前可以决定释放该组合。
如果释放策略仍然无法释放该组,该策略即告失效。
如果发送部分结果于到期时是true,(部分)中已有的消息消息组作为正常聚合器回复消息发布给输出通道.
否则,该物品将被弃用。
两者之间是有区别的小组暂停行为和MessageGroupStoreReaper(参见使用 XML 配置聚合器)
收割者对所有玩家发起强制完成消息组s 在MessageGroupStore周期性地。
这小组暂停每个人都做消息组如果在小组暂停.
此外,收割器还可用于移除空组(空组用于丢弃延迟消息完成时过期组是错误的)。
从5.5版本开始,组超时表达式可以被评估为java.util.Date实例。
这在如根据组创建时间确定预定任务时刻等情况下非常有用(MessageGroup.getTimestamp())而非计算时的当前消息到达组超时表达式被评估为长:
group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"
配置带有注释的聚合器
以下示例展示了一个配置有注释的聚合器:
public class Waiter {
...
@Aggregator (1)
public Delivery aggregatingMethod(List<OrderItem> items) {
...
}
@ReleaseStrategy (2)
public boolean releaseChecker(List<Message<?>> messages) {
...
}
@CorrelationStrategy (3)
public String correlateBy(OrderItem item) {
...
}
}
| 1 | 注释说明该方法应用作聚合器。 如果该类被用作聚合器,必须明确说明。 |
| 2 | 注释说明该方法被用作聚合器的发布策略。
如果在任何方法中都不存在,聚合器则使用简单序列大小释放策略. |
| 3 | 注释说明该方法应作为聚合器的相关策略使用。
如果没有相关性策略,聚合器会使用头部属性相关策略基于CORRELATION_ID. |
XML元素提供的所有配置选项也适用于@Aggregator注解。
聚合器可以明确引用 XML或如果@MessageEndpoint定义在类上,通过类路径扫描自动检测。
注释配置(@Aggregator以及其他)聚合器组件只涵盖简单场景,大多数默认选项已足够。
如果你在使用注释配置时需要对这些选项有更多控制,可以考虑使用@Bean定义聚合消息处理器并标记其@Bean方法:@ServiceActivator,如下示例所示:
@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageGroupStore);
aggregator.setOutputChannel(resultsChannel());
aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
aggregator.setTaskScheduler(this.taskScheduler);
return aggregator;
}
从4.2版本开始,聚合器FactoryBean可用来简化 Java 配置聚合消息处理器. |
聚合器中的状态管理:MessageGroupStore
聚合器(以及 Spring Integration 中的某些其他模式)是一种有状态模式,要求基于一组在一段时间内到达的消息做出决策,这些消息都具有相同的相关键。
有状态模式中接口的设计(例如发布策略)的驱动原则是组件(无论是由框架定义还是由用户定义)都应能够保持无状态状态。
所有状态都由消息组其管理权委托给MessageGroupStore.
这MessageGroupStore接口定义如下:
public interface MessageGroupStore {
int getMessageCountForAllMessageGroups();
int getMarkedMessageCountForAllMessageGroups();
int getMessageGroupCount();
MessageGroup getMessageGroup(Object groupId);
MessageGroup addMessageToGroup(Object groupId, Message<?> message);
MessageGroup markMessageGroup(MessageGroup group);
MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);
MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);
void removeMessageGroup(Object groupId);
void registerMessageGroupExpiryCallback(MessageGroupCallback callback);
int expireMessageGroups(long timeout);
}
更多信息请参见Javadoc。
这MessageGroupStore积累状态信息消息组在等待释放策略触发期间,而那个事件可能永远都不会发生。
因此,为了防止陈旧消息持续存在,并且让易失的存储在应用关闭时提供清理的钩子,MessageGroupStore允许你注册回调以应用到消息组当他们过期时。
界面非常直观,如下列表所示:
public interface MessageGroupCallback {
void execute(MessageGroupStore messageGroupStore, MessageGroup group);
}
回调可以直接访问存储和消息组,以便管理持久状态(例如,完全将组从存储中移除)。
这MessageGroupStore维护这些回调列表,按需应用于所有时间戳早于参数时间的消息(参见registerMessageGroupExpiryCallback(..)和expireMessageGroups(..)方法,前面描述)。
重要的是不要使用相同的信息MessageGroupStore在不同的聚合器组件中,当你打算依赖expireMessageGroups功能性。
每摘要相关消息处理注册自己的消息组回拨基于forceComplete()回调。
这样,每个待到期的组都可以被错误的聚合器完成或丢弃。
从版本5.0.10开始,UniqueExpiryCallback是从摘要相关消息处理对于注册回拨,在MessageGroupStore.
这MessageGroupStore而,则检查该类实例的存在,并在回调集中已有错误时,记录错误并发送相应消息。
这样,框架就不允许使用MessageGroupStore在不同的聚合器/重排序器中实例化,以避免上述的过期副作用,导致特定相关处理程序未创建的组。 |
你可以打电话给expireMessageGroups方法,带有超时值。
任何比当前时间减去该值更早的消息均已过期,回调已生效。
因此,存储用户定义了消息组“过期”的含义。
为了方便用户,Spring Integration 提供了一个封装消息的封装器,形式为MessageGroupStoreReaper,如下示例所示:
<bean id="reaper" class="org...MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>
死神是可跑.
在前述示例中,消息组存储的过期方法每十秒调用一次。
超时时间是30秒。
需要理解的是,“超时”属性MessageGroupStoreReaper是一个近似值,并且受任务调度器速率的影响,因为该属性仅在下一次计划执行时才被检查,MessageGroupStoreReaper任务。
例如,如果超时时间设置为十分钟,但MessageGroupStoreReaper任务每小时运行一次,最后一次执行MessageGroupStoreReaper任务发生在暂停前一分钟,消息组接下来的59分钟内不会失效。
因此,我们建议将超时率设定为至少等于或更短的超时值。 |
除了收割器外,当应用程序通过生命周期回调关闭时,也会调用到期回调摘要相关消息处理.
这摘要相关消息处理注册自身的到期回调,这就是带有布尔标志的链接发送部分结果于到期时在聚合器的XML配置中。
如果标志设置为true然后,当调用到期回调时,尚未释放的未标记消息可以发送到输出通道。
自从......MessageGroupStoreReaper从调度任务中调用,可能会生成一条消息(取决于sendPartialResultOnExpiry选项)对下游集成流程的建议提供自定义任务调度器其中消息发布错误处理通过errorChannel,这可能是常规聚合器发布功能所预期的。
同样的逻辑也适用于组超时功能,该功能同样依赖于任务调度器.
更多信息请参见错误处理。 |
|
当共享时 一些 欲了解更多关于 |
通量聚合器
在5.2版本中,FluxAggregatorMessageHandler组件已被引入。
它基于反应堆项目Flux.groupBy()和Flux.window()运营商。
收到的消息会被发送到通量汇由Flux.create()在该组件的构造器中。
如果输出通道不提供或不是 的实例ReactiveStreamsSubscribeableChannel(可订阅频道),主订阅通量由Lifecycle.start()实现。
否则,将推迟到由ReactiveStreamsSubscribeableChannel(可订阅频道)实现。
消息按Flux.groupBy()使用相关策略对于组键。
默认情况下,IntegrationMessageHeaderAccessor.CORRELATION_ID查询消息的头部。
默认情况下,每个关闭窗口都以通量在消息的有效载荷中产生。
该消息包含窗口中第一个消息的所有头部。
这通量在输出消息中,有效载荷必须订阅并下游处理。
这样的逻辑可以被setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)配置选项FluxAggregatorMessageHandler.
例如,如果我们希望有列表在最后一条消息中,我们可以配置Flux.collectList()像下面这样:
fluxAggregatorMessageHandler.setCombineFunction(
(messageFlux) ->
messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new));
在FluxAggregatorMessageHandler选择合适的窗口策略:
-
setBoundaryTrigger(谓词<消息<?>>)- 传播到Flux.windowUntil()算子。 更多信息请参见其JavaDocs。 它优先于所有其他窗户选项。 -
setWindowSize(int)和setWindowSizeFunction(Function<Message<?>, Integer>)- 传播到Flux.window(int)或windowTimeout(int, Duration). 默认情况下,窗口大小是从组中的第一个消息及其IntegrationMessageHeaderAccessor.SEQUENCE_SIZE页眉。 -
setWindowTimespan(Duration)- 传播到通量窗口(持续时间)或windowTimeout(int, Duration)这取决于窗口大小的配置。 -
setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)- 一个函数,用于对任何未被暴露选项覆盖的自定义窗口作应用到分组通量中的变换。
由于该分量是消息处理器实现它可以简单地用作@Bean与@ServiceActivator消息注释。
通过Java DSL,它可以从中使用.handle()EIP方法。
下面的示例展示了我们如何注册一个集成流程运行时以及如何FluxAggregatorMessageHandler可以与上游的分流器相关联:
IntegrationFlow fluxFlow =
(flow) -> flow
.split()
.channel(MessageChannels.flux())
.handle(new FluxAggregatorMessageHandler());
IntegrationFlowContext.IntegrationFlowRegistration registration =
this.integrationFlowContext.registration(fluxFlow)
.register();
Flux<Message<?>> window =
registration.getMessagingTemplate()
.convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);