该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0spring-doc.cadn.net.cn

监测

监听者表现监控

从2.3版本开始,监听器容器将自动创建并更新Micrometer定时器如果千分尺在类路径上检测到,且MeterRegistry存在于应用上下文中。 通过设置ContainerPropertyMicroEnabledfalse.spring-doc.cadn.net.cn

会维持两个计时器——一个用于成功呼叫监听者,另一个用于失败。spring-doc.cadn.net.cn

计时器被命名Spring.kafka.listener并具有以下标签:spring-doc.cadn.net.cn

你可以用容器属性微米标签财产。spring-doc.cadn.net.cn

从2.9.8、3.0.6版本开始,你可以在容器属性微米标签提供者;该函数接收到消费者记录<?, ?>并返回可以基于该记录并与 中任意静态标签合并的标签微米标签.spring-doc.cadn.net.cn

通过并发容器,会为每个线程创建计时器,并且名称标签后缀为-n其中 n 是0并发-1.

监控KafkaTemplate性能

从2.5版本开始,模板将自动创建和更新Micrometer。定时器s 表示发送作,如果千分尺在类路径上检测到,且MeterRegistry存在于应用上下文中。 通过设置模板的MicroEnabled属性到false.spring-doc.cadn.net.cn

会维持两个计时器——一个用于成功呼叫监听者,另一个用于失败。spring-doc.cadn.net.cn

计时器被命名spring.kafka.template并具有以下标签:spring-doc.cadn.net.cn

你可以用模板添加额外的标签微米标签财产。spring-doc.cadn.net.cn

从2.9.8、3.0.6版本开始,你可以提供KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>)财产;该函数接收到制作人唱片<?, ?>并返回可以基于该记录并与 中任意静态标签合并的标签微米标签.spring-doc.cadn.net.cn

微米本地度规

从2.5版本开始,该框架提供了工厂监听器以管理微表KafkaClientMetrics例如,生产者和消费者被创建并关闭。spring-doc.cadn.net.cn

要启用此功能,只需将听众添加到你的生产者和消费者工厂:spring-doc.cadn.net.cn

@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
    Map<String, Object> configs = consumerConfigs();
    ...
    DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
    ...
    cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return cf;
}

@Bean
public ProducerFactory<String, String> myProducerFactory() {
    Map<String, Object> configs = producerConfigs();
    configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
    ...
    DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
    ...
    pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return pf;
}

消费者/生产者身份证传递给监听者的信号会加到计量器的标签上,并带有标签名称spring.id.spring-doc.cadn.net.cn

一个获得卡夫卡度量的例子
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
                .tag("customTag", "customTagValue")
                .tag("spring.id", "myProducerFactory.myClientId-1")
                .functionCounter()
                .count();

StreamsBuilderFactoryBean- 参见KafkaStreams的微米支持spring-doc.cadn.net.cn

从3.3版本开始,KafkaMetricsSupport引入抽象类以管理io.micrometer.core.instrument.binder.kafka.KafkaMetrics绑定成MeterRegistry提供Kafka客户端。 这个职业就是上述的超级技能MicrometerConsumerListener,MicrometerProducerListenerKafkaStreamsMicrometerListener. 不过,它可以用于任何 Kafka 客户端的用例。 课程需要扩展,而且它的bindClient()unbindClient()需要调用 API 来连接 Kafka 客户端的指标与微米收集器。spring-doc.cadn.net.cn

微米观测

自3.0版本起,支持使用测距进行观测,适用于卡夫卡模板以及听众容器。spring-doc.cadn.net.cn

设置观察启用true卡夫卡模板容器属性以便观察;这将使微米计时器失效,因为计时器将根据每次观测被管理。spring-doc.cadn.net.cn

Micrometer Observation不支持批量监听器;这将启用微米计时器

更多信息请参见“微尺追踪”。spring-doc.cadn.net.cn

要给定时器/追踪添加标签,请自定义配置卡夫卡模板观察约定卡夫卡听众观察大会分别是模板或监听器容器。spring-doc.cadn.net.cn

默认实现会添加bean.name模板观察和listener.id容器标签。spring-doc.cadn.net.cn

你可以选择任一子职业默认KafkaTemplateObservationConvention默认Kafka听众观察约定或者提供全新的实现。spring-doc.cadn.net.cn

有关默认记录观测的详细信息,请参见“微米观测文档”。spring-doc.cadn.net.cn

从3.0.6版本开始,你可以根据消费者或生产者记录中的信息,为计时器和追踪添加动态标签。 为此,添加自定义卡夫卡听众观察大会和/或卡夫卡模板观察约定到监听器容器属性或卡夫卡模板分别。 这记录在两个观测语境中,属性都包含消费者记录制作人唱片分别。spring-doc.cadn.net.cn

发送方和接收方上下文remoteServiceName属性被设定为卡夫卡clusterId财产;该值由卡夫卡管理员. 如果因为某些原因——比如缺乏管理员权限,你无法恢复集群ID,从3.1版本开始,你可以设置手动设置clusterId卡夫卡管理员并注射到卡夫卡模板S和听众容器。 当它(默认情况下),管理员将调用describeCluster管理员作,用来从经纪人那里取回它。spring-doc.cadn.net.cn

批次监听者观察

使用批处理监听器时,默认情况下不会创建任何观测值,即使观测登记册存在。 这是因为观察的范围与线程绑定,而使用批量监听器时,观察与记录之间没有一一对应。spring-doc.cadn.net.cn

要在批处理监听器中启用每条记录的观测值,请设置容器工厂属性记录批次观察true.spring-doc.cadn.net.cn

@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory) {

    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    factory.getContainerProperties().setRecordObservationsInBatch(true);
    return factory;
}

当该性质为true,会为批次中的每个记录创建一个观察值,但该观察不会传播到监听器方法。 应用程序随后可以使用观测上下文跟踪批次中每条记录的处理情况。 这让你即使在批处理环境中也能看到每条记录的处理过程。spring-doc.cadn.net.cn