|
对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0! |
处理异常
本节介绍了如何使用 Apache Kafka 使用 Spring 时可能出现的各种异常。
监听器错误处理程序
从2.0版本开始,@KafkaListener注释新增了一个属性:errorHandler.
你可以使用errorHandler提供豆子名称KafkaListenerErrorHandler实现。
该功能接口只有一种方法,如下列表所示:
@FunctionalInterface
public interface KafkaListenerErrorHandler {
Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;
}
你可以访问春季消息留言<?>由消息转换器产生的对象以及监听者抛出的异常,后者被包裹在ListenerExecutionFailedException.
错误处理程序可以抛出原始或新的异常,这些异常会被抛入容器。
错误处理者返回的任何内容都会被忽略。
从2.7版本开始,你可以设置原始记录标题属性消息信息转换器和批处理消息传递转换器这导致了消费者记录将加入改装后的留言<?>在KafkaHeaders.RAW_数据页眉。
例如,如果你想使用死信出版恢复者在监听器错误处理程序中。
它可能用于请求/回复场景,当你希望在多次重试后,在捕获死信主题中的失败记录后,向发送方发送失败结果。
@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
return (msg, ex) -> {
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
return "FAILED";
}
throw ex;
};
}
它有一个子接口(消费者意识听众错误处理器)通过以下方法访问消费者对象:
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);
另一个子接口(ManualAckListenerErrorHandler)提供访问确认使用手动时的目标AckModes.
Object handleError(Message<?> message, ListenerExecutionFailedException exception,
Consumer<?, ?> consumer, @Nullable Acknowledgment ack);
无论哪种情况,你都不应对消费者进行任何寻觅,因为容器不会察觉到他们的存在。
容器错误处理程序
从2.8版本开始,遗产错误处理程序和批处理错误处理器接口已被新的通用错误处理器.
这些错误处理程序可以同时处理记录和批处理监听器的错误,使单个监听器容器工厂能够为这两种类型的监听器创建容器。通用错误处理器提供了替代大多数遗留框架错误处理器实现的实现。
看迁移自定义遗留错误处理器实现通用错误处理器用于迁移自定义错误处理程序的信息通用错误处理器.
当使用事务时,默认不会配置错误处理程序,因此异常会回滚事务。
事务容器的错误处理由AfterRollback处理器.
如果你在使用事务时提供了自定义错误处理程序,如果想回滚交易,它必须抛出异常。
该接口有一个默认方法isAckAfterHandle()容器调用该值以判断如果错误处理程序返回时未抛出异常,是否应提交偏移量;默认情况下,它返回为真。
通常,框架提供的错误处理程序在错误未被“处理”时(例如执行寻道作后)会抛出异常。
默认情况下,此类异常会被容器记录在错误水平。
所有框架错误处理程序都得到了扩展KafkaExceptionLogLevelAware这让你可以控制这些异常记录的级别。
/**
* Set the level at which the exception thrown by this handler is logged.
* @param logLevel the level (default ERROR).
*/
public void setLogLevel(KafkaException.Level logLevel) {
...
}
你可以指定一个全局错误处理程序,用于容器工厂中的所有监听器。 以下示例展示了如何实现:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setCommonErrorHandler(myErrorHandler);
...
return factory;
}
默认情况下,如果注释监听器方法抛出异常,会抛入容器,消息根据容器配置处理。
容器在调用错误处理程序前提交任何待处理的偏移提交。
如果你用的是 Spring Boot,只需添加错误处理程序作为@BeanBoot 会把它添加到自动配置的工厂。
退后,控者
错误处理程序如 DefaultErrorHandler 使用退避以确定在重新尝试送达前该等待多久。
从2.9版本开始,你可以自定义配置后退处理员.
默认处理程序会直接暂停线程,直到回退时间过去(或容器被停止)。
该框架还提供容器暂停退去处理器这会暂停监听器容器,直到退回时间过去,然后继续使用容器。
当延迟超过max.poll.interval.ms消费品。
注意,实际回撤时间的分辨率会受到投票时间容器属性。
默认错误处理
这个新的错误处理程序取代了SeekToCurrentErrorHandler和恢复批处理错误处理这些错误处理程序已经是多个版本的默认处理程序。
一个区别是批处理监听器的后备行为(当有除BatchListenerFailedException是抛弃的)相当于重试完整批次。
从2.9版本开始,默认错误处理可以配置为提供与下文讨论的未处理记录偏移相同的语义,但实际上无需寻求。
相反,记录会被监听器容器保留,并在错误处理程序退出(并且执行一次暂停后)后重新提交给监听器poll(),以保证消费者的生命;如果非阻塞重试或容器暂停退去处理器暂停可能跨越多个轮询)。
错误处理程序返回一个结果给容器,指示当前失败记录是否可以重新提交,或者是否已被恢复后不会再次发送给监听器。
要启用此模式,请设置seekAfterError(寻找过错)自false. |
错误处理程序可以恢复(跳过)持续失败的记录。
默认情况下,在十次失败后,失败记录会被记录(在错误等级)。
你可以用自定义恢复器配置处理程序 (双消费者) 以及退避它控制了每次投递尝试和延迟。
使用固定退后跟FixedBackOff.UNLIMITED_ATTEMPTS导致(实际上)无限次重试。
以下示例配置三次尝试后的恢复:
DefaultErrorHandler errorHandler =
new DefaultErrorHandler((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
要配置监听器容器并自定义该处理器实例,需将其添加到容器工厂。
例如,当@KafkaListener容器工厂,你可以添加默认错误处理如下:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
return factory;
}
对于记录监听器,这将重试一次最多2次(3次传递尝试),回溯时间为1秒,而不是默认配置(固定后退(0L,9)).
失败在重试次数用尽后直接记录。
例如,如果民意调查返回六条记录(每个分区0、1、2各两条),监听器在第四条记录抛出异常,容器通过提交前三条消息的偏移量来确认它们。
这默认错误处理试图为第1区划偏移1,为第2区偏移0。
下一个poll()返回三条未处理的记录。
如果AckMode是Batch容器在调用错误处理程序前,先提交前两个分区的偏移量。
对于批次监听器,监听者必须抛出BatchListenerFailedException以显示批次中哪些记录失败。
事件顺序如下:
-
在索引之前提交记录的偏移量。
-
如果重试次数未用尽,执行寻址以确保所有剩余记录(包括失败记录)重新交付。
-
如果重试用尽,尝试恢复失败记录(仅默认日志),并执行寻址,以重新交付剩余记录(不包括失败记录)。 恢复记录的偏移量被提交。
-
如果重试用尽且恢复失败,寻道则视同未重试而行。
从2.9版本开始,默认错误处理可以配置为提供与上述未处理记录偏移量相同的语义,但实际上无需寻求。
相反,错误处理程序会创建一个新的消费者记录<?, ?>仅包含未处理的记录,这些记录将在执行一次暂停后提交给听者poll(),以保持消费者生命)。
要启用此模式,请设置seekAfterError(寻找过错)自false. |
默认恢复器在重试用尽后记录失败记录。
你可以使用自定义恢复器,或者框架提供的恢复器,比如死信出版恢复者.
当使用 POJO 批处理监听器时(例如清单<事>),你没有完整的消费者记录可以添加到异常中,你可以直接添加失败记录的索引:
@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
for (int i = 0; i < things.size(); i++) {
try {
process(things.get(i));
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", i);
}
}
}
当容器配置为AckMode.MANUAL_IMMEDIATE,错误处理程序可以配置为提交恢复记录的偏移量;设置commitRecovered属性到true.
另见“发布死信记录”。
在使用事务时,类似的功能由DefaultAfterRollback处理器.
参见后回滚处理器。
这默认错误处理认为某些例外是致命的,因此跳过重试;恢复器在第一次失败时被调用。默认被视为致命的异常有:
-
反序列化例外 -
MessageConversionException -
ConversionException -
方法参数解析异常 -
NoSuchMethodException -
ClassCastException
因为这些例外在重试投递中不太可能解决。
你可以在不可重试类别中添加更多异常类型,或者完全替换分类异常的映射。参见 JavadocsDefaultErrorHandler.addNotRetryableException()和DefaultErrorHandler.setClassifications()更多信息以及相关信息春季重试 二进制异常分类器.
这里有一个例子,补充了IllegalArgumentException对于不可重试的例外:
@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
handler.addNotRetryableExceptions(IllegalArgumentException.class);
return handler;
}
错误处理程序可以配置为一个或多个RetryListeners,接收重试和恢复进度的通知。
从2.8.10版本开始,增加了批处理监听器的方法。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
}
default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
}
default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
}
}
更多信息请参见JavaDocs。
如果恢复器失败(抛出异常),失败记录将被包含在寻道中。
如果恢复器失败,则退避默认会重置,重新投递会先经过后退,然后才会再次尝试回收。
要在恢复失败后跳过重试,设置错误处理程序的resetStateOnRecoveryFailure(重置状态恢复失败)自false. |
你可以为错误处理程序提供BiFunction<ConsumerRecord<?, ?>,例外,BackOff>以确定退避根据失败记录和/或例外情况,使用如下:
handler.setBackOffFunction((record, ex) -> { ... });
如果函数返回零,作者的默认状态退避将被使用。
设置resetStateOnExceptionChange自true重试序列将被重新开始(包括选择新的退避如果异常类型在失败之间发生变化,则 。
什么时候false(2.9版本之前的默认设置),不考虑例外类型。
从2.9版本开始,现在情况变了true默认。
另见投递尝试标题。
使用批处理错误处理程序的转换错误
从版本 2.8 开始,批量监听器现在可以在使用消息转换器其中ByteArray 反串化器一个字节解串器或者字符串解串器,以及默认错误处理.
当发生转换错误时,有效载荷被设置为空,并在记录头部添加反序列化异常,类似于ErrorHandlingDeserializer.
一份列表ConversionExceptionS在监听器中可用,因此监听器可以抛出BatchListenerFailedException表示转换异常发生的第一个索引。
例:
@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
for (int i = 0; i < in.size(); i++) {
Foo foo = in.get(i);
if (foo == null && exceptions.get(i) != null) {
throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
}
process(foo);
}
}
重试完整批次
这就是默认错误处理对于批处理监听器,监听器抛出的例外不是BatchListenerFailedException.
无法保证当一个批次重新交付时,该批次的记录数量相同,或者重送记录的顺序是否相同。
因此,很难轻松地为一批次保持重试状态。
这FallbackBatchErrorHandler采取以下方法。
如果批处理监听器抛出的异常,那不是BatchListenerFailedException,重试是从内存批次记录中完成的。
为了避免在延长重试序列中发生重新平衡,错误处理程序会暂停消费者,在每次重试前轮询其,然后在休眠后返回,然后再次调用监听器。
如果/当重试次数用尽时,消费者记录恢复器对批次中的每条记录调用。
如果恢复器抛出异常,或线程在休眠时被中断,下一次轮询时将重新交付记录批次。
在退出之前,无论结果如何,消费者都会被恢复。
| 该机制不能用于交易。 |
在等待退避间隔中,错误处理程序会以短暂休眠循环,直到达到期望的延迟,同时检查容器是否已被停止,允许在停止()而不是造成延迟。
容器停止错误处理程序
这CommonContainerStoppingErrorHandler如果监听者抛出异常,则停止容器。
对于唱片听众来说,当AckMode是记录已处理记录的偏移量被提交。
对于唱片听众来说,当AckMode是任何手动值,已确认记录的偏移量被提交。
对于唱片听众来说,当AckMode是Batch对于批处理监听者,容器重启时会重放整个批次。
容器停止后,有一个例外将ListenerExecutionFailedException被抛出。
这是为了让交易回滚(如果启用了交易)。
错误处理程序委托
这CommonDelegatingErrorHandler可以根据异常类型委派给不同的错误处理程序。
例如,你可能想调用默认错误处理对于大多数例外,或CommonContainerStoppingErrorHandler为其他人。
所有代理必须共享相同的兼容属性(ackAfterHandle,seekAfterError(寻找过错)…).
使用不同的常见错误处理程序用于记录监听器和批量监听器
如果你想对记录和批处理监听器使用不同的错误处理策略,CommonMixedErrorHandler提供了允许为每个监听器类型配置特定错误处理程序的设备。
通用错误处理摘要
-
默认错误处理 -
CommonContainerStoppingErrorHandler -
CommonDelegatingErrorHandler -
CommonLoggingErrorHandler -
CommonMixedErrorHandler
遗留错误处理器及其替代者
| 遗留错误处理器 | 更换 |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
没有替代品,只能用 |
|
|
|
没有替代品,使用 |
迁移自定义遗留错误处理器实现通用错误处理器
请参阅 JavaDocs 中的文档通用错误处理器.
以替换错误处理程序或消费者意识错误处理器实施,你应该实施handleOne()然后离开seeksAfterHandle()返回false(默认)。
你还应该实施handleOtherException()处理记录处理范围之外的异常(例如消费者错误)。
替换剩余记录错误处理实施,你应该实施handleRemaining()并覆盖seeksAfterHandle()返回true(错误处理程序必须执行必要的寻道)。
你还应该实施handleOtherException()- 处理记录处理范围外的异常(例如消费者错误)。
替换任何批处理错误处理器实施,你应该实施handleBatch()你还应该实施handleOtherException()- 处理记录处理范围外的异常(例如消费者错误)。
回滚处理器之后
使用事务时,如果监听者抛出异常(且错误处理程序存在,则抛出异常),事务会被回滚。
默认情况下,任何未处理的记录(包括失败记录)会在下一次轮询时重新获取。
这通过表现来实现寻求在DefaultAfterRollback处理器. 使用批处理监听器时,整个批次记录会被重新处理(容器不知道批次中哪条记录失败了)。要修改这种行为,你可以用自定义配置监听器容器AfterRollback处理器. 例如,对于基于唱片的听众,你可能想记录失败的唱片,尝试几次后放弃,比如发布到无效的主题。
从2.2版本开始,DefaultAfterRollback处理器现在可以恢复(跳过)持续失败的记录。默认情况下,失败记录在十次失败后会被记录(在错误level)。你可以用自定义恢复器配置处理器(双消费者)以及最大失败率。设置maxFailures负数属性会导致无限次重试。以下示例配置三次尝试后的恢复:
AfterRollbackProcessor<String, String> processor =
new DefaultAfterRollbackProcessor((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
当你不使用事务时,可以通过配置默认错误处理. 参见容器错误处理程序。
从3.2版本开始,恢复现在可以恢复(跳过)整批不断失败的记录。 设置ContainerProperties.setBatchRecoverAfterRollback(true)以启用此功能。
| 默认行为,即批处理监听器无法恢复,因为框架无法判断批处理中哪个记录一直失败。在这种情况下,应用程序监听器必须处理一个不断失败的记录。 |
另见“发布死信记录”。
从2.2.5版本开始,DefaultAfterRollback处理器可以在新事务中调用(在失败的交易回滚后开始)。那么,如果你正在使用死信出版恢复者要发布失败记录,处理器会将恢复记录在原始主题/分区中的偏移量发送给事务。要启用此功能,设置commitRecovered和kafka模板在DefaultAfterRollback处理器.
如果恢复器失败(抛出异常),失败记录将被包含在寻道中。
从2.5.5版本开始,如果恢复器失败,退避默认会重置,重新投递会先经过后退,然后才会再次尝试回收。
在早期版本中,退避未重置,下一次失败时重新尝试恢复。要恢复到之前的行为,请设置处理器的resetStateOnRecoveryFailure(重置状态恢复失败)属性到false. |
从2.6版本开始,你现在可以为处理器提供BiFunction<ConsumerRecord<?, ?>,例外,BackOff>以确定退避根据失败记录和/或例外情况,使用如下:
handler.setBackOffFunction((record, ex) -> { ... });
如果函数返回零,处理器的默认值退避将被使用。
从版本2.6.3开始,设置resetStateOnExceptionChange自true重试序列将被重新开始(包括选择新的退避如果异常类型在失败之间发生变化,则 。
默认情况下,该例外类型不会被考虑。
从2.3.1版本开始,类似于默认错误处理这DefaultAfterRollback处理器认为某些例外是致命的,因此跳过重试;恢复器在第一次失败时被调用。默认被视为致命的异常有:
-
反序列化例外 -
MessageConversionException -
ConversionException -
方法参数解析异常 -
NoSuchMethodException -
ClassCastException
因为这些例外在重试投递中不太可能解决。
你可以在不可重试类别中添加更多异常类型,或者完全替换分类异常的映射。参见 JavadocsDefaultAfterRollbackProcessor.setClassifications()更多信息以及相关信息春季重试 二进制异常分类器.
这里有一个例子,补充了IllegalArgumentException对于不可重试的例外:
@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
processor.addNotRetryableException(IllegalArgumentException.class);
return processor;
}
另见投递尝试标题。
与当前的卡夫卡客户端,容器无法检测制片人受限例外是由重新平衡引起的,或者生产者transactional.id因超时或到期而被撤销。由于大多数情况下,是由于重平衡引起的,容器不会调用AfterRollback处理器(因为我们不再被分配分区,因此不适合寻找分区)。如果你确保超时足够大以处理每个事务,并定期执行“空”事务(例如通过ListenerContainerIdleEvent你可以因为超时和到期而避免围栏。或者,你可以设置停止容器围栏容器属性 到true容器会停止,避免记录丢失。你可以消费ConsumerStoppedEvent并检查原因的性质围栏以检测该条件。由于事件中也引用了容器,你可以用该事件重启容器。 |
从2.7版本开始,等待退避间隔中,错误处理程序会以短暂休眠循环,直到达到期望的延迟,同时检查容器是否已被停止,允许在停止()而不是造成延迟。
从2.7版本开始,处理器可以配置一个或多个RetryListeners,接收重试和恢复进度的通知。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
}
更多信息请参见JavaDocs。
投递尝试头
以下内容仅适用于录音听众,不适用于批量听众。
从2.5版本开始,当使用错误处理程序或AfterRollback处理器实现DeliveryAttemptAware,可以使 能够KafkaHeaders.DELIVERY_ATTEMPT头部 (kafka_deliveryAttempt)记录下来。
该头部的值是从1开始递增的整数。
收到生化反应时消费者记录<?, ?>整数属于字节[4].
int delivery = ByteBuffer.wrap(record.headers()
.lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
.getInt();
使用@KafkaListener其中DefaultKafkaHeaderMapper或SimpleKafkaHeaderMapper,可以通过加法得到@Header(KafkaHeaders.DELIVERY_ATTEMPT) 内力投递作为监听方法的一个参数。
要启用该头部的填充,设置容器属性deliveryAttemptHeader自true.
默认情况下,它被禁用,以避免查找每条记录的状态并添加报头的(小)开销。
这默认错误处理和DefaultAfterRollback处理器支持此功能。
监听器信息头
在某些情况下,能够知道监听者运行在哪个容器中是有用的。
从2.8.4版本开始,你现在可以设置听众信息监听器容器上的属性,或设置信息属性@KafkaListener注解。
然后,容器会在KafkaListener.LISTENER_INFO所有收到消息的头部;然后它可以用于记录拦截器、Filter等,或者直接用于监听器本身。
@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
info = "this is the something listener")
public void listen(@Payload Thing thing,
@Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
...
}
当用于记录拦截者或RecordFilterStrategy实现时,头部以字节数组的形式出现在消费者记录中,通过KafkaListenerAnnotationBeanPostProcessor的charSet财产。
头部映射器还能转换为字符串创作时消息头从消费者记录中,切勿将该头映射到出站记录。
对于POJO批处理监听器,从2.8.6版本开始,头部会被复制到批的每个成员中,并且也可作为单一版本使用。字符串参数转换后。
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
info = "info for batch")
public void listen(List<Thing> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header(KafkaHeaders.LISTENER_INFO) String info) {
...
}
如果批处理监听器有过滤器,而过滤器结果是空的批次,你需要添加必要 = 错误前往@Header参数,因为空批次的信息不可用。 |
如果你收到清单<信息<事>>相关信息在KafkaHeaders.LISTENER_INFO每个的头部留言<?>.
有关消费批次的更多信息,请参见批处理监听器。
出版无效唱片
你可以配置默认错误处理和DefaultAfterRollback处理器当记录的失败次数达到最大时,使用记录恢复器。
该框架提供了死信出版恢复者,将失败的信息发布到另一个主题。
恢复者需要KafkaTemplate<Object, Object>,用于发送记录。
你也可以选择性地用BiFunction<ConsumerRecord<?, ?>,例外,TopicPartition>调用该函数以解析目标主题和分区。
默认情况下,死信记录会发送到名为<原始话题>。双重学习技术(原始主题名称后缀为.双重学习技术)并映射到与原始记录相同的分区。
因此,使用默认解析器时,死号主题的分区数量必须至少与原始主题相同。
|
如果他们回来了主题分区有一个负划分,该划分不设在制作人唱片,因此划分由卡夫卡选定。
从2.2.4版本开始,任何ListenerExecutionFailedException(例如,当检测到异常时抛出@KafkaListener方法)通过组ID财产。
这使得目标解析器除了利用消费者记录选择死信主题。
以下示例展示了如何接线自定义目的地解析器:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
}
else {
return new TopicPartition(r.topic() + ".other.failures", r.partition());
}
});
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));
发送到死符主题的记录会通过以下头部进行增强:
-
KafkaHeaders.DLT_EXCEPTION_FQCN: 例外类名称(通常为ListenerExecutionFailedException,但也可以是其他的)。 -
KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN: 例外会导致类名(如果存在,自版本2.8起)。 -
KafkaHeaders.DLT_EXCEPTION_STACKTRACE:例外栈跟踪。 -
KafkaHeaders.DLT_EXCEPTION_MESSAGE:例外信息。 -
KafkaHeaders.DLT_KEY_EXCEPTION_FQCN: 例外类名称(仅限密钥反序列化错误)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE: 例外栈跟踪(仅键反序列化错误)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE: 例外消息(仅限密钥反序列化错误)。 -
KafkaHeaders.DLT_ORIGINAL_TOPIC:原始话题。 -
KafkaHeaders.DLT_ORIGINAL_PARTITION:原始划分。 -
KafkaHeaders.DLT_ORIGINAL_OFFSET:原始偏移量。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP:原始时间戳。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE:原始时间戳类型。 -
KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP:原始未能处理记录的消费者组(自2.8版本起)。
关键例外仅由以下原因引起反序列化例外所以没有DLT_KEY_EXCEPTION_CAUSE_FQCN.
增加更多头部有两种机制。
-
恢复者和覆盖者子类
createProducerRecord()-叫super.createProducerRecord()并添加更多的页眉。 -
提供一个
双功能接收消费者记录和异常,返回头对象;从那里获取的头部将被复制到最终的生产者记录中;另见管理死字母记录头。 用setHeadersFunction()以设置双功能.
第二种实现更简单,但第一种包含更多信息,包括已组装好的标准头部。
从2.3版本开始,当与ErrorHandlingDeserializer,出版商将恢复记录value(),在死信生产者记录中,恢复到未被解序的原始值。
此前,value()是空,用户代码必须解码反序列化例外从消息头部获取。
此外,你还可以提供多个卡夫卡模板s向出版社致函;例如,如果你想发布字节[]来自反序列化例外以及使用不同序列化器与成功反串行的记录的值。
这里有一个配置发布者的示例,用卡夫卡模板使用字符串和字节[]序列化器:
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
KafkaTemplate<?, ?> bytesTemplate) {
Map<Class<?>, KafkaOperations<?, ?>> templates = new LinkedHashMap<>();
templates.put(String.class, stringTemplate);
templates.put(byte[].class, bytesTemplate);
return new DeadLetterPublishingRecoverer(templates);
}
出版商利用地图键寻找适合value()即将出版。
一个LinkedHashMap建议按顺序检查密钥。
出版时零值,且存在多个模板,恢复器会寻找无效类;如果不存在,则取出values().iterator()将被使用。
从2.7版本开始,你可以使用setFailIfSendResultIsError当消息发布失败时,会抛出异常。
你也可以设置一个超时时间来验证发送者的成功情况,使用setWaitForSendResultTimeout.
如果恢复器失败(抛出异常),失败记录将被包含在寻道中。
从2.5.5版本开始,如果恢复器失败,退避默认会重置,重新投递会先经过后退,然后才会再次尝试回收。
在早期版本中,退避未重置,下一次失败时重新尝试恢复。
要恢复到之前的行为,设置错误处理程序的resetStateOnRecoveryFailure(重置状态恢复失败)属性到false. |
从版本2.6.3开始,设置resetStateOnExceptionChange自true重试序列将被重新开始(包括选择新的退避如果异常类型在失败之间发生变化,则 。
默认情况下,该例外类型不会被考虑。
从2.3版本开始,恢复器也可以与Kafka Streams一起使用——更多信息请参见“从反序列化异常恢复”。
这ErrorHandlingDeserializer在头部中添加反序列化异常ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER和ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER(使用 Java 序列化)。
默认情况下,这些头部不会被保留在发布到死字母主题的消息中。
从版本 2.7 开始,如果键和值都无法反序列化,则会将两者的原始值填充到发送给 DLT 的记录中。
如果输入记录相互依赖,但可能出现顺序混乱,将失败记录重新发布到原始主题尾部(次数有限),而不是直接发送到死符主题,可能会有帮助。 请参见这个 Stack Overflow 问题作为示例。
以下错误处理程序配置将实现这一点:
@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
(rec, ex) -> {
org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
if (retries == null) {
retries = new RecordHeader("retries", new byte[] { 1 });
rec.headers().add(retries);
}
else {
retries.value()[0]++;
}
return retries.value()[0] > 5
? new TopicPartition("topic.DLT", rec.partition())
: new TopicPartition("topic", rec.partition());
}), new FixedBackOff(0L, 0L));
}
从版本 2.7 开始,恢复器检查目标解析器选中的分区是否真实存在。
如果不存在该划分,则该划分在制作人唱片设置为零,允许卡夫卡制片人选择分区。
你可以通过设置验证分区属性到false.
从3.1版本开始,设置logRecoveryRecord属性到true将记录恢复记录和异常。
管理死符记录头
-
附加原始标题(默认true) -
stripPreviousExceptionHeaders(默认true自2.8版本起)
Apache Kafka 支持同名的多个头部;要获得“最新”值,你可以用headers.lastHeader(headerName);要获得多个头部的迭代器,请使用headers.headers(headerName).iterator().
当反复重新发布失败记录时,这些头部可能会增长(最终导致发布失败,原因包括记录过大例外);这在异常头和栈跟踪头尤为明显。
这两种属性的原因是,虽然你可能只想保留最后的例外信息,但你可能希望保留记录在每次失败时通过哪些主题的历史记录。
附加原始标题应用于所有被命名为源语言而stripPreviousExceptionHeaders应用于所有被命名为例外.
从2.8.4版本开始,你可以控制哪些标准头部会添加到输出记录中。
参见enum HeadersToAdd对于默认添加的(目前)10个标准头的通用名称(这些不是实际的头部名称,只是抽象;实际头部名称由getHeaderNames()该方法可被子类覆盖。
要排除标题,请使用excludeHeaders()方法;例如,为了抑制在首部添加异常栈跟踪,可以使用:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);
此外,你还可以通过添加ExceptionHeadersCreator;这也会禁用所有标准异常头。
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
kafkaHeaders.add(new RecordHeader(..., ...));
});
从2.8.4版本开始,你现在可以通过addHeaders函数方法。
这使得即使已有其他功能注册,例如使用非阻塞重试时,也能应用额外的功能。
指数倒退与最大次数实现
Spring Framework 提供了若干退避实现。
默认情况下,指数退让将无限期重试;在一定次数的重试后放弃需要计算最大经过时间.
自2.7.3版本起,Apache Kafka的Spring提供了指数倒退与最大次数这是一个子类,接收到maxRetries并自动计算最大经过时间,这更方便一些。
@Bean
DefaultErrorHandler handler() {
ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
bo.setInitialInterval(1_000L);
bo.setMultiplier(2.0);
bo.setMaxInterval(10_000L);
return new DefaultErrorHandler(myRecoverer, bo);
}
之后会再试1, 2, 4, 8, 10, 10几秒钟后,才叫来救援者。