|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0! |
@KafkaListener生命周期管理
为 创建的监听器容器@KafkaListener在应用语境中,注释不是豆子。
相反,它们会注册在某种基础设施机构KafkaListenerEndpointRegistry.
该 bean 由框架自动声明,并管理容器的生命周期;它会自动启动所有自动启动设置为true.
所有集装箱工厂制造的集装箱必须在同一集装箱中阶段.
更多信息请参见“监听器容器自动启动”。
你可以用注册表程序管理生命周期。
启动或停止注册表会启动或停止所有注册容器。
或者,你可以通过使用其身份证属性。
你可以设置自动启动在注释上,它覆盖了容器工厂中配置的默认设置。
你可以从应用上下文中获取豆子的引用,比如自动接线,以管理其注册容器。
以下示例展示了如何实现:
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;
...
this.registry.getListenerContainer("myContainer").start();
...
注册表只维护其所管理容器的生命周期;被声明为 Beans 的容器不由注册表管理,可以从应用上下文中获取。
可以通过调用注册表获得一组受管理容器getListenerContainers()方法。
2.2.5 版本增加了一种便捷方法getAllListenerContainers()返回所有容器的集合,包括注册管理的容器和被声明为豆子的容器。
返回的集合会包含已初始化的原型豆,但不会初始化任何懒惰豆声明。
在应用上下文刷新后注册的端点将立即启动,无论它们的终端如何自动启动财产,以符合SmartLifecycle合同,其中自动启动仅在应用上下文初始化时考虑。
迟注册的一个例子是具有@KafkaListener在原型作用域中,实例是在上下文初始化后创建的。
从2.8.7版本开始,你可以设置注册表alwaysStartAfterRefresh属性到false然后容器自动启动属性将决定容器是否被启动。 |
Retrieving MessageListenerContainers from KafkaListenerEndpointRegistry
这KafkaListenerEndpointRegistry提供了检索的方法MessageListenerContainer为适应多种管理场景而设的实例:
所有容器:对于覆盖所有监听器容器的作,使用getListenerContainers()以获取一个全面的收藏。
Collection<MessageListenerContainer> allContainers = registry.getListenerContainers();
按ID指定容器:要管理单个容器,getListenerContainer(String id)通过其ID实现检索。
MessageListenerContainer specificContainer = registry.getListenerContainer("myContainerId");
动态容器过滤:在3.2版本引入,涉及两个超载getListenerContainersMatching方法使容器的精细选择成为可能。
一种方法是谓词<字符串>对于基于ID的过滤作为参数,而另一个则取BiPredicate<String, MessageListenerContainer>对于更高级的条件,可能包括容器属性或状态作为参数。
// Prefix matching (Predicate<String>)
Collection<MessageListenerContainer> filteredContainers =
registry.getListenerContainersMatching(id -> id.startsWith("productListener-retry-"));
// Regex matching (Predicate<String>)
Collection<MessageListenerContainer> regexFilteredContainers =
registry.getListenerContainersMatching(myPattern::matches);
// Pre-built Set of IDs (Predicate<String>)
Collection<MessageListenerContainer> setFilteredContainers =
registry.getListenerContainersMatching(myIdSet::contains);
// Advanced Filtering: ID prefix and running state (BiPredicate<String, MessageListenerContainer>)
Collection<MessageListenerContainer> advancedFilteredContainers =
registry.getListenerContainersMatching(
(id, container) -> id.startsWith("specificPrefix-") && container.isRunning()
);
利用这些方法高效管理和查询MessageListenerContainer你申请中的实例。