该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0spring-doc.cadn.net.cn

主题命名

重试主题和DLT的名称通过在主主题后加上提供的或默认值,并附加该主题的延迟或索引来命名。spring-doc.cadn.net.cn

“我的主题” → “我的主题-重试-0”,“我的主题-重试-1”,......,“我的主题-DLT”spring-doc.cadn.net.cn

“my-other-topic” → “my-topic-myRetrySuffix-1000”, “my-topic-myRetrySuffix-2000”, ..., “my-topic-myDltSuffix”spring-doc.cadn.net.cn

默认行为是为每次尝试创建独立的重试主题,并附加索引值:retry-0, retry-1, ..., retry-n。 因此,默认重试主题的数量是配置的最大尝试次数减1分。

重试主题与DLT后缀

你可以指定重试和DLT主题将使用的后缀。spring-doc.cadn.net.cn

@RetryableTopic(retryTopicSuffix = "-my-retry-suffix", dltTopicSuffix = "-my-dlt-suffix")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .retryTopicSuffix("-my-retry-suffix")
            .dltTopicSuffix("-my-dlt-suffix")
            .create(template);
}
默认后缀分别是“-retry”和“-dlt”,分别代表重试主题和DLT。

附加主题索引或延迟

你可以在后缀后加上主题的索引值或延迟值。spring-doc.cadn.net.cn

@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .suffixTopicsWithIndexValues()
            .create(template);
    }
默认行为是加上延迟值后缀,除非是固定延迟配置且包含多个主题,此时主题会加上主题的索引后缀。

固定延迟重试的单一主题

如果你使用固定延迟政策,比如固定退后政策禁止退赛政策你可以用一个主题来完成非阻塞性的重试。 本主题将以提供的或默认后缀作为后缀,且不会附加索引或延迟值。spring-doc.cadn.net.cn

之前的固定延迟策略现已弃用,可以替换为相同时间间隔主题再利用策略.
@RetryableTopic(backoff = @Backoff(2_000), sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3_000)
            .maxAttempts(5)
            .useSingleTopicForSameIntervals()
            .create(template);
}
默认行为是为每次尝试创建独立的重试主题,并附加其索引值:retry-0, retry-1, ...

最大区间指数延迟的单一主题

如果你使用指数退避策略(指数退让政策),你可以用单一重试主题来完成延迟为配置的尝试的非阻塞重试最大区间.spring-doc.cadn.net.cn

这个“最终”重试主题将加上提供的或默认后缀,并且会带有索引或最大区间附加价值。spring-doc.cadn.net.cn

通过选择使用单一主题进行重试,且最大区间延迟时,配置一个指数重试策略,持续长时间重试可能更可行,因为这种方法不需要大量主题。

从 3.2 版本开始,默认行为是重复使用相同的重试主题,使用指数回退时,重试主题后缀为延迟值,最后一次重试主题重复使用相同的区间(对应于最大区间延迟)。spring-doc.cadn.net.cn

例如,在配置指数退回时,用initialInterval=1_000,乘数=2maxInterval=16_000,为了连续尝试一小时,需要配置最大尝试次数作为229,默认情况下所需的重试主题为:spring-doc.cadn.net.cn

使用重试主题数等于配置的策略时最大尝试次数负1,最后一次重试主题(对应于最大区间延迟)如果加上额外的索引,则为:spring-doc.cadn.net.cn

如果需要多个主题,则可以通过以下配置实现。spring-doc.cadn.net.cn

@RetryableTopic(attempts = 230,
    backoff = @Backoff(delay = 1_000, multiplier = 2, maxDelay = 16_000),
    sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(1_000, 2, 16_000)
            .maxAttempts(230)
            .useSingleTopicForSameIntervals()
            .create(template);
}

自定义命名策略

更复杂的命名策略可以通过注册一个实现RetryTopicNamesProviderFactory. 默认实现为后缀重新尝试TopicNamesProviderFactory另一种实现可以通过以下方式注册:spring-doc.cadn.net.cn

@Override
protected RetryTopicComponentFactory createComponentFactory() {
    return new RetryTopicComponentFactory() {
        @Override
        public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
            return new CustomRetryTopicNamesProviderFactory();
        }
    };
}

举例来说,以下实现除了标准后缀外,还在retry/dlt主题名称前添加前缀:spring-doc.cadn.net.cn

public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {

    @Override
    public RetryTopicNamesProvider createRetryTopicNamesProvider(
                DestinationTopic.Properties properties) {

        if (properties.isMainEndpoint()) {
            return new SuffixingRetryTopicNamesProvider(properties);
        }
        else {
            return new SuffixingRetryTopicNamesProvider(properties) {

                @Override
                public String getTopicName(String topic) {
                    return "my-prefix-" + super.getTopicName(topic);
                }

            };
        }
    }

}