配置

默认设置中,通过添加@RetryableTopic注释到@KafkaListener方法。 这是推荐且最简单的方法,因为它会自动配置所需的重试基础设施,并以默认设置创建重试和DLT主题。spring-doc.cadn.net.cn

要导入非阻塞重试基础设施并将其组件暴露为豆子,请注释 a@Configuration类为@EnableKafkaRetryTopic. 这使得对该功能组件的注入和运行时查找成为可能,并为高级和全局配置提供了基础。spring-doc.cadn.net.cn

其实也不必再加@EnableKafka,如果你加上这个注释,因为@EnableKafkaRetryTopic是元注释,记为@EnableKafka.

为了高级和全球定制,请扩展重试主题配置支持在单一中@Configuration类并覆盖相关方法。 更多详情请参见“配置全局设置和功能”。spring-doc.cadn.net.cn

默认情况下,重试主题的容器与主容器具有相同的并发性。 从3.0版本开始,你可以设置不同的并发对于重试容器(无论是在注释上,还是在重试主题配置构建器).spring-doc.cadn.net.cn

只使用上述两种全局配置方法中的一种(@EnableKafkaRetryTopic或延伸重试主题配置支持). 此外,只有一个@Configuration课程应该扩展重试主题配置支持.spring-doc.cadn.net.cn

使用@RetryableTopic注解

要配置重试主题和 DLT 以实现@KafkaListener注释方法,你只需要添加@RetryableTopic对它的注释和 Spring for Apache Kafka 会引导所有必要的主题和默认配置的消费者。spring-doc.cadn.net.cn

@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
    // ... message processing
}

自3.2版本起,@RetryableTopic对@KafkaListener支持的课程包括:spring-doc.cadn.net.cn

@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public class ClassLevelRetryListener {

    @KafkaHandler
    public void processMessage(MyPojo message) {
        // ... message processing
    }

}

你可以通过用@DltHandler注解。 如果没有提供DltHandler方法,会创建一个默认的消费者,只记录消耗情况。spring-doc.cadn.net.cn

@DltHandler
public void processMessage(MyPojo message) {
    // ... message processing, persistence, etc
}
如果你不指定kafka模板,就用一个名字的豆子defaultRetryTopicKafkaTemplate会被查到。 如果找不到豆子,则会投出例外。

从3.0版本开始,@RetryableTopic注释可以作为自定义注释的元注释使用;例如:spring-doc.cadn.net.cn

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {

    @AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
    String parallelism() default "3";

}

RetryTopicConfiguration

你也可以通过创建非阻塞重试支持来配置RetryTopicConfiguration豆子@Configuration注释类。spring-doc.cadn.net.cn

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .create(template);
}

这将为所有标注为@KafkaListener使用默认配置。这卡夫卡模板实例是消息转发的必需。spring-doc.cadn.net.cn

为了更细致地控制每个主题的非阻塞性重审,建议多个RetryTopicConfiguration豆子可以提供。spring-doc.cadn.net.cn

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3000)
            .maxAttempts(5)
            .concurrency(1)
            .includeTopics(List.of("my-topic", "my-other-topic"))
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(1000, 2, 5000)
            .maxAttempts(4)
            .excludeTopics(List.of("my-topic", "my-other-topic"))
            .retryOn(MyException.class)
            .create(template);
}
重试主题和DLT的消费者会被分配到一个消费者组,组ID是你在组ID参数@KafkaListener附上主题后缀的注释。 如果你不提供任何选项,它们都会属于同一个群体,而在重试主题上重新平衡会导致主主题不必要的重新平衡。
如果消费者配置为ErrorHandlingDeserializer,为了处理反序列化异常,重要的是配置卡夫卡模板以及其制作者,配备一个既能处理普通对象也能处理原始内容的串行化器字节[]这些值是由反序列化例外产生的。 模板的通用值类型应为对象. 一种方法是使用DelegatingByTypeSerializer;以下是一个示例:
@Bean
public ProducerFactory<String, Object> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
        new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
               MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}
倍数@KafkaListener注释可以用于同一主题,无论是否手动分配分区,并配合非阻塞重试,但每个主题只会使用一种配置。 最好只用一个RetryTopicConfiguration用于此类主题配置的 BEAN;如果是多重的话@RetryableTopic注释是针对同一主题使用,所有注释的值都应相同,否则其中一个会被应用到该主题的所有监听者,而其他注释的值将被忽略。

配置全局设置和功能

自2.9版本起,之前用于配置组件的“bean”覆盖方法已被移除(由于API的实验性质,未被弃用)。 这并不改变RetryTopicConfigurationBEANS方法——仅针对基础设施组件的配置。 现在重试主题配置支持课程应扩展为(单一)@Configuration类,以及被覆盖的正确方法。 举个例子:spring-doc.cadn.net.cn

@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {

    @Override
    protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
        blockingRetries
                .retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
                .backOff(new FixedBackOff(3000, 3));
    }

    @Override
    protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
        nonBlockingFatalExceptions.add(MyNonBlockingException.class);
    }

    @Override
    protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
        // Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
        customizersConfigurer.customizeErrorHandler(eh -> {
            eh.setSeekAfterError(false);
        });
    }

}
使用这种配置方法时,@EnableKafkaRetryTopic注释不应用于防止因重复豆子而导致上下文启动失败。 用简单的@EnableKafka而是注释。

什么时候自动创建主题成立时,主主题和重试主题将按照指定的分区数和复制因子创建。 从3.0版本开始,默认复制因子为-1,即使用经纪商违约。 如果你的经纪商版本早于 2.4,你需要设置明确的数值。 要覆盖特定主题(例如主主题或DLT)的这些值,只需添加一个新主题 @Bean具备所需的性质;这会覆盖自动创建属性。spring-doc.cadn.net.cn

默认情况下,记录会通过接收记录的原始分区发布给重试主题。 如果重试主题的分区数少于主主题,你应适当配置框架;以下是一个示例。
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {

    @Override
    protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
        return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
    }

    ...

}

函数的参数是消费者记录和下一个主题的名称。 你可以返回特定的分区号,或者用以表示卡夫卡制片人应该决定分区。spring-doc.cadn.net.cn

默认情况下,当记录通过重试主题时,所有重试头部的值(尝试次数、时间戳)都会被保留。 从2.9.6版本开始,如果你只想保留这些头部的最后一个值,可以使用configureDeadLetterPublishingContainerFactory()上述设置工厂retainAllRetryHeaderValues属性到false.spring-doc.cadn.net.cn

查找RetryTopicConfiguration

尝试提供RetryTopicConfiguration通过从 一个 创建一个@RetryableTopic注释,或者如果没有注释,则从豆子容器中获取。spring-doc.cadn.net.cn

如果容器中发现了豆子,会检查是否应由这些实例处理这些主题。spring-doc.cadn.net.cn

如果@RetryableTopic注释为DltHandler注释方法会被查找。spring-doc.cadn.net.cn

自3.2版本起,提供新的API以创建RetryTopicConfiguration什么时候@RetryableTopic类注释:spring-doc.cadn.net.cn

@Bean
public RetryTopicConfiguration myRetryTopic() {
    RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
    return provider.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
}

@RetryableTopic
public static class AnnotatedClass {
    // NoOps
}