对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0spring-doc.cadn.net.cn

消息监听器容器

MessageListenerContainer以下实现如下:spring-doc.cadn.net.cn

KafkaMessageListenerContainer接收单线程中所有主题或分区的所有消息。 这ConcurrentMessageListenerContainer代表们KafkaMessageListenerContainer实例以实现多线程消费。spring-doc.cadn.net.cn

从2.2.7版本开始,你可以添加一个记录拦截者到听众容器;在呼叫听众允许检查或修改记录之前,该命令会被调用。 如果拦截器返回空,监听器不会被调用。 从2.7版本开始,它增加了在监听器退出后调用的方法(通常是,或通过抛出异常)。 此外,从2.7版本开始,现在有了批次拦截者,为批处理监听器提供了类似的功能。 此外,消费者意识记录拦截者(和批次拦截者)提供访问消费者<?, ?>. 例如,这可以用来访问拦截器中的消费者指标。spring-doc.cadn.net.cn

你不应执行任何影响消费者位置和/或在这些拦截器中提交偏移的方法;容器需要管理这些信息。
如果拦截者通过创建新的记录来变异该记录,主题,分区抵消必须保持不变,以避免意外副作用如创纪录的损失。

合成记录拦截者复合批次拦截者可用于调用多个拦截器。spring-doc.cadn.net.cn

默认情况下,从2.8版本开始,使用事务时,拦截器会在交易开始前被调用。 你可以设置监听器容器的拦截在Tx之前属性到false而是在交易开始后调用拦截器。 从2.9版本开始,这适用于任何事务管理器,而不仅仅是KafkaAwareTransactionManagers. 例如,这允许拦截者参与容器发起的 JDBC 事务。spring-doc.cadn.net.cn

从版本 2.3.8、2.4.6 开始,ConcurrentMessageListenerContainer现在支持当并发大于一时的静态成员资格。 这group.instance.id后缀为-nn起点为1. 此外,还有更高的session.timeout.ms可以用来减少重平衡事件,例如在应用实例重启时。spring-doc.cadn.net.cn

KafkaMessageListenerContainer

可用构造器如下:spring-doc.cadn.net.cn

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)

它获得了消费者工厂以及关于主题和分区的信息,以及在 a 中的其他配置容器属性对象。容器属性具有以下构造子:spring-doc.cadn.net.cn

public ContainerProperties(TopicPartitionOffset... topicPartitions)

public ContainerProperties(String... topics)

public ContainerProperties(Pattern topicPattern)

第一个构造子取一个数组主题分区偏移参数,用来明确指示容器使用哪些分区(使用消费者)assign()方法)并带有可选的初始偏移。 正值默认是绝对偏移。 负值相对于当前分区内的最后偏移量,默认为负值。 一个 的构造子主题分区偏移这需要额外的布尔提出了论点。 如果这是true,初始偏移量(正负值)相对于该消费者当前位置。 这些偏移在容器启动时应用。 第二种是多个主题的数组,卡夫卡根据group.id财产——在群体中分配分区。 第三个使用正则表达式模式选择主题。spring-doc.cadn.net.cn

赋予一个消息监听器对于容器,你可以使用ContainerProps.setMessageListener在创建容器时。 以下示例展示了如何实现:spring-doc.cadn.net.cn

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

在创建DefaultKafkaConsumerFactory,使用仅取上述属性的构造函数意味着键和值反串化器类别是从配置中获取的。 或者反串化器实例可以传递给DefaultKafkaConsumerFactory构造函数 用于键和/或值,此时所有消费者共享相同的实例。 另一种选择是提供提供商<去串行器>s(从版本2.3开始)将用于获得单独的反串化器每个实例消费者:spring-doc.cadn.net.cn

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

请参见 Javadoc容器属性关于你可以设置的各种属性的更多信息。spring-doc.cadn.net.cn

自2.1.1版本起,新增了一个性质logContainerConfig已开放。 什么时候true信息启用日志,每个监听器容器都会写一条日志消息,总结其配置属性。spring-doc.cadn.net.cn

默认情况下,主题偏移提交的日志会在调试Logging层。 从2.1.2版本开始,一个属性在容器属性commitLogLevel可以指定这些消息的日志级别。 例如,将对数级别更改为信息,你可以使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);.spring-doc.cadn.net.cn

从版本 2.2 开始,新增了一个容器属性,称为缺失主题致命已添加(默认:false自2.3.4起)。 这样如果中介中没有任何配置的主题,容器就无法启动。 如果容器配置为监听主题模式(正则表达式),则不适用。 此前,容器线程在consumer.poll()方法是在记录大量消息的同时等待主题出现。 除了日志,没有任何问题的迹象。spring-doc.cadn.net.cn

自2.8版本起,新增了container属性authExceptionRetryInterval已经被引入。 这会导致容器在收到任何消息后重新尝试获取消息认证异常授权例外来自卡夫卡消费者. 例如,当配置用户被拒绝阅读某个主题或凭证错误时,就会发生这种情况。 定义authExceptionRetryInterval允许容器在获得适当权限后恢复。spring-doc.cadn.net.cn

默认情况下,不会配置任何间隔——认证和授权错误被视为致命,导致容器停止。

从2.8版本开始,创建消费者工厂时,如果你作为对象(在构造函数中或通过设置器)提供反串行器,工厂会调用configure()用配置属性配置它们的方法。spring-doc.cadn.net.cn

ConcurrentMessageListenerContainer

单一构造子类似于KafkaListenerContainer构造 函数。 以下列表显示了构造者的签名:spring-doc.cadn.net.cn

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

它还有一个并发财产。 例如container.setConcurrency(3)创造了三个KafkaMessageListenerContainer实例。spring-doc.cadn.net.cn

如果容器属性为主题(或主题模式)配置,Kafka 会利用其组管理功能将分区分配给消费者。spring-doc.cadn.net.cn

当听多个主题时,默认的分区分布可能与你预期的不符。 例如,如果你有三个主题,每个主题有五个分区,你想使用并发=15你只看到五个活跃消费者,每个对象被分配一个分区,其他10个消费者处于空闲状态。 这是因为默认的卡夫卡ConsumerPartitionAssignor范围分配器(详见其Javadoc)。 在这种情况下,你可以考虑使用轮转分配器而是将分区分配到所有消费者之间。 然后,每个消费者被分配一个主题或分区。 要更改ConsumerPartitionAssignor,你可以设置partition.assignment.strategy消费者财产(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)在提供给DefaultKafkaConsumerFactory.spring-doc.cadn.net.cn

使用Spring Boot时,你可以按以下方式分配策略:spring-doc.cadn.net.cn

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

当容器属性配置为主题分区偏移s, theConcurrentMessageListenerContainer分布主题分区偏移代表各项实例KafkaMessageListenerContainer实例。spring-doc.cadn.net.cn

比如说,六个人主题分区偏移提供了实例,并且并发3;每个容器有两个分区。 五人主题分区偏移实例中,两个容器获得两个分区,第三个容器拥有一个分区。 如果并发大于的主题分区并发向下调整,使每个容器获得一个分区。spring-doc.cadn.net.cn

client.id如果设定,属性附加为-n哪里n是对应并发的消费者实例。 这是在启用 JMX 时为 MBean 提供唯一名称所必需的。

从1.3版本开始,MessageListenerContainer提供访问底层指标的权限卡夫卡消费者. 在ConcurrentMessageListenerContainer指标()方法返回所有目标的度量KafkaMessageListenerContainer实例。 这些指标被分组为Map<MetricName, ?扩展了公制>客户端ID为基础条件卡夫卡消费者.spring-doc.cadn.net.cn

从2.3版本开始,容器属性提供idleBetweenPolls让监听器容器中的主循环在中间休眠的选项KafkaConsumer.poll()调用。 从提供的选项中选择实际的睡眠间隔作为最小值,以及max.poll.interval.ms消费者配置和当前记录批处理时间。spring-doc.cadn.net.cn

提交偏移量

提供多种抵消选项。 如果enable.auto.commit消费财产是trueKafka 根据其配置自动提交偏移量。 如果是false,容器支持多个AckMode设置(下表描述)。 默认AckModeBatch. 从2.3版本开始,框架集enable.auto.commitfalse除非配置中明确设置。 之前,卡夫卡默认(true如果该属性未被设置,则使用)。spring-doc.cadn.net.cn

消费者poll()方法返回一个或多个消费者唱片. 这消息监听器每张唱片都被调用。 以下列表描述了容器对每个角色所采取的动作AckMode(当交易未被使用时):spring-doc.cadn.net.cn

使用事务时,偏移量会发送到事务中,语义等价于记录Batch,取决于监听器类型(记录或批处理)。spring-doc.cadn.net.cn

手动MANUAL_IMMEDIATE要求听者必须是确认信息听众或者BatchAcknowledgegingMessageListener. 参见“信息听众”。

具体情况同步提交容器属性,该提交同步(commitSync)commitAsync()采用消费者的方法。同步提交true默认情况下;参见setSyncCommitTimeout. 看setCommitCallback获取异步提交的结果;默认回调为LoggingCommitCallback该系统记录错误(以及调试层面的成功)。spring-doc.cadn.net.cn

由于监听器容器有自己的提交偏移量机制,它更倾向于 KafkaConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG存在false. 从2.3版本开始,除非在消费者工厂中特别设置或容器的消费者属性覆盖,否则它会无条件地将其设置为false。spring-doc.cadn.net.cn

确认具有以下方法:spring-doc.cadn.net.cn

public interface Acknowledgment {

    void acknowledge();

}

该方法使监听者能够控制何时提交偏移。spring-doc.cadn.net.cn

从2.3版本开始,确认接口还有两种额外的方法nack(长眠)NACK(智力指数,长时间睡眠). 第一个用于唱片监听器,第二个用于批量监听器。 调用错误的方法会触发非法州例外.spring-doc.cadn.net.cn

如果你想提交部分批次,使用Nack(),使用事务时,设AckMode手动;调用Nack()将成功处理记录的偏移量发送到交易。
Nack()只能在调用你的监听者的消费者线程中调用。
Nack()在使用“乱序提交”时不允许。

和唱片听众一起,当Nack()调用,提交任何待处理的偏移量,丢弃上次轮询剩余记录,并对其分区执行寻址,使失败记录和未处理记录在下一个分区重新交付poll(). 消费者可以在重新送达前暂停,方法是设置论点。 这类似于在容器配置为默认错误处理.spring-doc.cadn.net.cn

Nack()暂停整个监听器,达到指定的睡眠时长,包括所有分配的分区。

使用批处理监听器时,你可以在失败发生的批处理中指定索引。 什么时候Nack()调用时,记录会先提交偏移量,然后对失败和丢弃记录的分区执行索引和寻道,以便下一次记录重新交付poll().spring-doc.cadn.net.cn

消费者在睡眠期间被暂停,以便我们继续对经纪人进行投票,以保持消费者的生命。 实际的睡眠时间和分辨率取决于容器的投票时间默认时间为5秒。 最小睡眠时间等于投票时间而且所有睡眠时间都会是它的倍数。 对于短暂的睡眠时间,或者为了提高准确性,可以考虑减少容器的数量投票时间.

从3.0.10版本开始,批处理监听者可以提交批处理部分的偏移量,使用确认(索引)确认论点。 当调用该方法时,索引处记录的偏移量(以及所有之前的记录)都会被提交。 叫确认()部分批次提交完成后,将提交批处理剩余部分的偏移量。 以下限制适用:spring-doc.cadn.net.cn

这些限制会被强制执行,方法会抛出IllegalArgumentException非法州例外,取决于违规的程度。spring-doc.cadn.net.cn

Listener Container Auto Startup

监听器容器实现SmartLifecycle自动启动true默认。 容器在晚期阶段开始(Integer.MAX值 - 100). 其他实现组件SmartLifecycle,处理监听者数据,应从更早阶段开始。 这- 100为后续阶段留出空间,使组件在容器后自动启动。spring-doc.cadn.net.cn