该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0spring-doc.cadn.net.cn

串行化、反串化与消息转换

概述

Apache Kafka 提供了一个用于序列化和反序列化记录值及其键的高级 API。它包含在org.apache.kafka.common.serialization.Serializer<T>org.apache.kafka.common.serialization.Deserializer<T>带有一些内置实现的抽象。同时,我们可以通过以下方式指定串行器和反串行器类制作人消费者配置属性。以下示例展示了如何实现:spring-doc.cadn.net.cn

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

对于更复杂或特殊的情况,卡夫卡消费者(因此,卡夫卡制片人)提供过载的构造函数串行器反串化器的实例钥匙分别。spring-doc.cadn.net.cn

当你使用这个 API 时,默认KafkaProducerFactoryDefaultKafkaConsumerFactory还要通过构造函数或设定器方法提供属性以注入自定义串行器反串化器实例进入目标制作人消费者. 另外,你也可以通过提供商<序列号生成器>提供商<去串行器>通过构造子实现实例——这些提供商每个 的创建时 都被调用制作人消费者.spring-doc.cadn.net.cn

字符串序列化

自2.5版本起,Spring for Apache Kafka 提供了以下内容ToStringSerializer解析字符串反序列化器使用String表示实体的类。 他们依赖于各种方法toString以及一些Function<String>双功能<字符串,头部>解析字符串并填充实例的属性。 通常,这会调用类上的某种静态方法,例如解析:spring-doc.cadn.net.cn

ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);

默认情况下,ToStringSerializer配置为传递记录中序列化实体的类型信息. 你可以通过设置addTypeInfo属性到false. 这些信息可以被解析字符串反序列化器在接受方。spring-doc.cadn.net.cn

  • ToStringSerializer.ADD_TYPE_INFO_HEADERS(默认true你可以设置为falseToStringSerializer(设置addTypeInfo财产)。spring-doc.cadn.net.cn

ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
    byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
    String entityType = new String(header);

    if (entityType.contains("Thing")) {
        return Thing.parse(str);
    }
    else {
        // ...parsing logic
    }
});

你可以配置字符集用于转换字符串来/回字节[]默认为UTF-8.spring-doc.cadn.net.cn

你可以用解析器方法的名称配置解串器,使用ConsumerConfig性能:spring-doc.cadn.net.cn

属性必须包含类的完整限定名称,后面是方法名,中间用句点分隔.. 该方法必须是静态的,并且符号为(字符串,头部)(弦乐).spring-doc.cadn.net.cn

一个致StringSerde也提供与Kafka Streams一起使用的版本。spring-doc.cadn.net.cn

JSON

Spring for Apache Kafka 也提供JsonSerializerJsonDeserializer基于 Jackson JSON 对象映射器。 这JsonSerializer允许将任意 Java 对象写成 JSON字节[]. 这JsonDeserializer需要额外的职业<?>目标类型允许对被消耗的 进行反序列化的参数字节[]到正确的目标物体。 以下示例展示了如何创建JsonDeserializer:spring-doc.cadn.net.cn

JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);

你可以自定义两者JsonSerializerJsonDeserializer带有对象映射器. 你也可以扩展它们,实现某些特定的配置逻辑configure(Map<String, ?> configs, boolean isKey)方法。spring-doc.cadn.net.cn

从版本 2.3 开始,所有支持 JSON 的组件默认配置为JacksonUtils.enhancedObjectMapper()实例,伴随MapperFeature.DEFAULT_VIEW_INCLUSIONDeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES功能被禁用。 此外,这样的实例还配备了已知的自定义数据类型模块,如 Java 时间和 Kotlin 支持。 看JacksonUtils.enhancedObjectMapper()更多信息请参见JavaDocs。 该方法还会注册org.springframework.kafka.support.JacksonMimeTypeModuleorg.springframework.util.MimeType将对象序列化为纯字符串,以便通过网络实现平台间兼容性。 一个JacksonMime类型模块可以在应用上下文中注册为 BEAN,并且会自动配置为Spring靴对象映射器实例.spring-doc.cadn.net.cn

同样从2.3版本开始,JsonDeserializer提供类型参考基于 的构造器,用于更好地处理目标通用容器类型。spring-doc.cadn.net.cn

从2.1版本开始,你可以在记录中传递类型信息允许处理多种类型的作。 此外,你还可以通过以下 Kafka 属性配置串行器和反串行器。 如果你已经提供了,这些条款就没有影响串行器反串化器的实例卡夫卡消费者卡夫卡制片人分别。spring-doc.cadn.net.cn

配置属性

从2.2版本开始,类型信息头部(如果由串行器添加)会被解串器移除。 你可以通过设置removeTypeHeaders属性到false,要么直接在反串器上,要么带有前面描述的配置性质。spring-doc.cadn.net.cn

从版本 2.8 开始,如果你按照程序化构建方式构建序列化器或反串行化器,工厂会应用上述属性,只要你没有明确设置任何属性(使用set*()方法或使用 fluent API)。 此前,在程序化创建时,配置属性从未应用;如果你直接在对象上显式设置属性,情况依然如此。

映射类型

从2.2版本开始,使用JSON时,可以通过使用前面列表中的属性来提供类型映射。 以前,你需要在串行器和解串器中自定义类型映射器。 映射由一个逗号分隔的列表组成token:className对。 在出站时,有效载荷的类名会映射到对应的Tokens。 在入站时,类型头中的Tokens映射到对应的类名。spring-doc.cadn.net.cn

以下示例创建一组映射:spring-doc.cadn.net.cn

senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");
对应的对象必须兼容。

如果你使用 Spring Boot,你可以在application.properties(或称yaml)文件。 以下示例展示了如何实现:spring-doc.cadn.net.cn

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat

你只能用属性进行简单的配置。 用于更高级的配置(例如使用自定义配置)对象映射器在串行器和解串器中),你应该使用生产商和消费级工厂的构造器,这些构建器接受预设的串行器和解串器。 以下 Spring Boot 示例覆盖了默认工厂:spring-doc.cadn.net.cn

@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(JsonDeserializer customValueDeserializer) {
    Map<String, Object> properties = new HashMap<>();
    // properties.put(..., ...)
    // ...
    return new DefaultKafkaConsumerFactory<>(properties,
        new StringDeserializer(), customValueDeserializer);
}

@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(JsonSerializer customValueSerializer) {
    return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(),
        new StringSerializer(), customValueSerializer);
}

此外,还提供了传球手,作为使用这些构造器的替代方案。spring-doc.cadn.net.cn

使用 Spring Boot 并覆盖消费者工厂生产工厂如上所示,需要在 BEAN 方法返回类型中使用万用符泛型。 如果提供了具体的通用类型,Spring Boot 会忽略这些豆子,仍然使用默认的豆子。

从2.2版本开始,你可以明确配置反串化器使用提供的目标类型,并通过使用带有布尔值的超载构造函数,忽略头部中的类型信息useHeadersIfPresent参数(即true默认情况下)。 以下示例展示了如何实现:spring-doc.cadn.net.cn

DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
        new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));

利用方法确定类型

从2.5版本开始,你现在可以通过属性配置反串行器,调用方法来确定目标类型。 如果存在,这将覆盖上述所有其他技术。 如果数据是由不使用 Spring 串行器的应用程序发布的,并且你需要根据数据或其他头部进行不同类型反序列化,这会很有用。 将这些属性设置为方法名——一个完全限定的类名,后跟方法名,中间用句号分隔.. 该方法必须声明为公共静态,有三种签名之一(字符串主题,字节[]数据,头部),(字节数据,头部)(字节[] 数据)并返回一辆JacksonJavaType.spring-doc.cadn.net.cn

你可以使用任意的头部,或者检查数据来确定类型。spring-doc.cadn.net.cn

JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);

JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);

public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
    // {"thisIsAFieldInThing1":"value", ...
    if (data[21] == '1') {
        return thing1Type;
    }
    else {
        return thing2Type;
    }
}

对于更复杂的数据检查,可以考虑使用JsonPath或类似的,但测试越简单,确定类型,过程就越高效。spring-doc.cadn.net.cn

以下是通过程序创建解串器的示例(当在构造函数中向消费级工厂提供解串器时):spring-doc.cadn.net.cn

JsonDeserializer<Object> deser = new JsonDeserializer<>()
        .trustedPackages("*")
        .typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);

...

public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
    ...
}

程序化构建

自2.3版本起,在为生产者/消费工厂编程构建串行化器/反串行器时,可以使用流流API简化配置。spring-doc.cadn.net.cn

@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
        new JsonSerializer<MyKeyType>()
            .forKeys()
            .noTypeInfo(),
        new JsonSerializer<MyValueType>()
            .noTypeInfo());
    return pf;
}

@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
        new JsonDeserializer<>(MyKeyType.class)
            .forKeys()
            .ignoreTypeHeaders(),
        new JsonDeserializer<>(MyValueType.class)
            .ignoreTypeHeaders());
    return cf;
}

要通过程序实现类型映射,类似于“使用方法确定类型”,可以使用类型功能财产。spring-doc.cadn.net.cn

JsonDeserializer<Object> deser = new JsonDeserializer<>()
        .trustedPackages("*")
        .typeFunction(MyUtils::thingOneOrThingTwo);

或者,只要你不使用 fluent API 来配置属性,或者用以下方式设置属性set*()工厂将利用配置属性配置串行器/解串器;参见配置属性spring-doc.cadn.net.cn

分派串行器和解串器

使用 头部

2.3 版本引入了授权序列化器授权去序列化器,允许生成和使用不同键和/或值类型的记录。 制作人必须设置一个头部DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR到一个选择器值,用于选择该值和DelegatingSerializer.KEY序列化选择器用于钥匙;如果未找到匹配,则非法州例外被抛出。spring-doc.cadn.net.cn

对于输入记录,解串器使用相同的头部来选择所需的解串器;如果找不到匹配或没有头部,原始文件字节[]被归还。spring-doc.cadn.net.cn

你可以配置选择器的映射到串行器 / 反串化器通过构造函数,或者你可以通过 Kafka 的生产者/消费者属性配置,并带有密钥DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIGDelegatingSerializer.KEY_序列化选择器_配置. 对于序列化器,生产者属性可以是Map<String,对象>其中键是选择器,值为串行器实例,串行器或者班级名称。 该性质也可以是逗号分隔映射条目的字符串,如下所示。spring-doc.cadn.net.cn

对于解串器,消费者属性可以是Map<String,对象>其中键是选择器,值为反串化器实例,一个反串化器或者班级名称。 该性质也可以是逗号分隔映射条目的字符串,如下所示。spring-doc.cadn.net.cn

要使用属性进行配置,请使用以下语法:spring-doc.cadn.net.cn

producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")

consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")

制片人随后会设置DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR头部 至东西1东西2.spring-doc.cadn.net.cn

该技术支持将不同类型发送到同一主题(或不同主题)。spring-doc.cadn.net.cn

从2.5.1版本开始,如果类型(键或值)是支持的标准类型之一,则无需设置选择器头部塞尔德斯 (,整数,等等)。 相反,串行器会将报头设置为类型的类名。 无需为这些类型配置串行化器或解串化器,它们会动态生成一次。

关于另一种将不同类型发送到不同主题的技巧,请参见RoutingKafkaTemplate.spring-doc.cadn.net.cn

按类型划分

2.8 版本引入了DelegatingByTypeSerializer.spring-doc.cadn.net.cn

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            null, new DelegatingByTypeSerializer(Map.of(
                    byte[].class, new ByteArraySerializer(),
                    Bytes.class, new BytesSerializer(),
                    String.class, new StringSerializer())));
}

从版本 2.8.3 开始,你可以配置串行化器检查映射键是否可从目标对象分配,这在代理串行器可以序列化子类时非常有用。 在这种情况下,如果存在歧义匹配,则有序地图,例如LinkedHashMap应当提供。spring-doc.cadn.net.cn

按主题

从2.8版本开始,DelegatingByTopicSerializerDelegatingByTopicDeserializer允许根据主题名称选择串行器/解串器。 正则表达式模式s 用于查找要使用的实例。 该映射可以通过构造函数配置,或通过属性(引号分隔的列表)配置模式:序列化器).spring-doc.cadn.net.cn

producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArraySerializer.class.getName()
        + ", topic[5-9]:" + StringSerializer.class.getName());
...
consumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArrayDeserializer.class.getName()
        + ", topic[5-9]:" + StringDeserializer.class.getName());

KEY_SERIALIZATION_TOPIC_CONFIG用这个来做钥匙时,spring-doc.cadn.net.cn

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            new IntegerSerializer(),
            new DelegatingByTopicSerializer(Map.of(
                    Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
                    Pattern.compile("topic[5-9]"), new StringSerializer())),
                    new JsonSerializer<Object>());  // default
}

你可以指定默认串行器/解串器,当没有模式匹配时使用DelegatingByTopicSerialization.KEY_序列化_主题_默认DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT.spring-doc.cadn.net.cn

附加财产DelegatingByTopicSerialization.CASE_SENSITIVE(默认true),当设置为false这会让主题查找的案例变得不敏感。spring-doc.cadn.net.cn

重试反串器

RetryingDeserializer使用代理反串化器重试模板当代理在解序列过程中可能出现暂时错误(如网络问题)时,重新尝试反序列化。spring-doc.cadn.net.cn

ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
    new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
    new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));

从版本开始3.1.2一个恢复回调可以设置为RetryingDeserializer选择。spring-doc.cadn.net.cn

关于重试模板比如重试政策、退后政策等等。spring-doc.cadn.net.cn

春季消息消息转换

虽然串行器反串化器API 从底层 Kafka 出发非常简单且灵活消费者制作人从角度来看,在使用任一情况下,你可能需要在 Spring Messaging 层面提供更多灵活性@KafkaListener或者 Spring Integration 的 Apache Kafka 支持。方便你轻松地与org.springframework.messaging.MessageSpring for Apache Kafka 提供了消息转换器抽象化消息信息转换器实现及其JsonMessageConverter(以及子职业)自定义。你可以注入消息转换器变成了卡夫卡模板实例直接和摘要KafkaListenerContainerFactoryBean对@KafkaListener.containerFactory()财产。 以下示例展示了如何实现:spring-doc.cadn.net.cn

@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordMessageConverter(new JsonMessageConverter());
    return factory;
}
...
@KafkaListener(topics = "jsonData",
                containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}

使用Spring Boot时,只需将转换器定义为@Bean而 Spring Boot 的自动配置会将其连接到自动配置的模板和容器工厂。spring-doc.cadn.net.cn

当你使用@KafkaListener参数类型会提供给消息转换器以辅助转换。spring-doc.cadn.net.cn

这种类型推断只有在@KafkaListener注释在方法层面声明。带有类级@KafkaListener,有效载荷类型用于选择@KafkaHandler调用方法,因此必须在选择该方法之前就已经转换好。spring-doc.cadn.net.cn

在消费者端,你可以配置JsonMessageConverter; 它能应付消费者记录类型的值字节[],字节字符串所以应与ByteArray 反串化器,字节解串器字符串解串器. (字节[]字节效率更高,因为它们避免了不必要的字节[]字符串转换)。你也可以配置特定的子类JsonMessageConverter如果你愿意,这对应的是解串器。spring-doc.cadn.net.cn

在生产者端,当你使用 Spring Integration 或KafkaTemplate.send(Message<?> message)方法(参见卡夫卡模板),你必须配置一个与配置好的Kafka兼容的消息转换器串行器.spring-doc.cadn.net.cn

同样,使用字节[]字节效率更高,因为它们避免了字符串字节[]转换。spring-doc.cadn.net.cn

为了方便起见,从2.3版本开始,该框架还提供了StringOrBytes 串行器可以串行化这三种值类型,因此可以与任一消息转换器一起使用。spring-doc.cadn.net.cn

从2.7.1版本开始,消息有效载荷转换可以委派给春季消息 智能消息转换器; 这使得转换能够基于MessageHeaders.CONTENT_TYPE页眉。spring-doc.cadn.net.cn

KafkaMessageConverter.fromMessage()该方法用于出站转换为制作人唱片消息有效载荷为ProducerRecord.value()财产。 这KafkaMessageConverter.toMessage()该方法用于从中的入站转换消费者记录有效载荷为ConsumerRecord.value()财产。 这SmartMessageConverter.toMessage()调用方法来创建一个新的出站留言<?>来自消息传球给fromMessage()(通常由KafkaTemplate.send(Message<?> msg)). 类似地,在KafkaMessageConverter.toMessage()方法,在转换器创建新的留言<?>来自消费者记录SmartMessageConverter.fromMessage()调用方法,然后用新转换的有效载荷创建最终的入站消息。无论哪种情况,如果智能消息转换器返回,使用原始消息。

当默认转换器被用于卡夫卡模板而监听器容器工厂,你配置了智能消息转换器通过呼叫setMessagingConverter()在模板上,并通过内容类型转换器@KafkaListener方法。spring-doc.cadn.net.cn

template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
    contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
    ...
}

使用 Spring 数据投影接口

从版本 2.1.1 开始,你可以将 JSON 转换为 Spring Data Projection 接口,而非具体类型。这允许对数据进行非常选择性和低耦合的绑定,包括从 JSON 文档中多个位置查找值。例如,以下接口可以定义为消息有效载荷类型:spring-doc.cadn.net.cn

interface SomeSample {

  @JsonPath({ "$.username", "$.user.name" })
  String getUsername();

}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
    String username = in.getUsername();
    ...
}

默认情况下,访问器方法将以字段形式查找收到的 JSON 文档中的属性名称。 这@JsonPath表达式允许自定义值查找,甚至可以定义多个 JSON 路径表达式,从多个位置查找值,直到表达式返回实际值。spring-doc.cadn.net.cn

要启用此功能,请使用投影消息转换器配置了合适的代理转换器(用于出站转换和非投影接口转换)。 你还必须补充Spring-data:Spring-Data-commonscom.jayway.jsonpath:json-path去上课路径。spring-doc.cadn.net.cn

当用作参数时@KafkaListener该接口类型会像正常一样自动传递给转换器。spring-doc.cadn.net.cn

ErrorHandlingDeserializer

当反串行器未能反串行消息时,Spring 无法处理该问题,因为问题发生在poll()返回。 为解决此问题,以下ErrorHandlingDeserializer已经被引入。 该解串器将任务委托给真正的反串化器(键或值)。 如果代理未能将记录内容反序列化,ErrorHandlingDeserializer返回 a值和反序列化例外在包含原因和原始字节的头部中。 当你使用记录级别时消息监听器,如果消费者记录包含一个反序列化例外键或值的头部,容器的错误处理程序与失败的消费者记录. 唱片不会传递给听众。spring-doc.cadn.net.cn

或者,你也可以配置ErrorHandlingDeserializer通过提供失败的反序列化函数,即Function<FailedDeserializationInfo, T>. 调用该函数来创建T,并以通常方式传递给听者。 一个类型的对象FailedDeserializationInfo,包含所有上下文信息,提供给函数。 你可以找到反序列化例外(作为序列化的 Java 对象)在报头中。 请参见 Javadoc 中的ErrorHandlingDeserializer更多信息请见。spring-doc.cadn.net.cn

你可以使用DefaultKafkaConsumerFactory构造器取密钥和值反串化器对象和电线在适当的位置ErrorHandlingDeserializer你用合适的代理配置的实例。 或者,您也可以使用消费者配置属性(这些属性被ErrorHandlingDeserializer)以实例化代理。 这些物业名称为ErrorHandlingDeserializer.KEY_反串行器类ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS. 属性价值可以是类别名称或类别名称。 以下示例展示了如何设置这些属性:spring-doc.cadn.net.cn

... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);

以下示例使用了一个失败的反序列化函数.spring-doc.cadn.net.cn

public class BadThing extends Thing {

  private final FailedDeserializationInfo failedDeserializationInfo;

  public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
    this.failedDeserializationInfo = failedDeserializationInfo;
  }

  public FailedDeserializationInfo getFailedDeserializationInfo() {
    return this.failedDeserializationInfo;
  }

}

public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {

  @Override
  public Thing apply(FailedDeserializationInfo info) {
    return new BadThing(info);
  }

}

前述示例使用了以下配置:spring-doc.cadn.net.cn

...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
如果消费者配置为ErrorHandlingDeserializer配置卡夫卡模板以及其制作者,配备一个既能处理普通对象也能处理原始内容的串行化器字节[]这些值是由反序列化例外产生的。 模板的通用值类型应为对象. 一种方法是使用DelegatingByTypeSerializer;以下是一个示例:
@Bean
public ProducerFactory<String, Object> producerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
    new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
          MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
}

当使用ErrorHandlingDeserializer使用批处理监听器时,你必须检查消息头部中的反序列化异常。 当与DefaultBatchErrorHandler你可以利用该头来确定异常失败的记录,并通过BatchListenerFailedException.spring-doc.cadn.net.cn

@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
    for (int i = 0; i < in.size(); i++) {
        Thing thing = in.get(i);
        if (thing == null
                && headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
            try {
                DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
                        headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
                if (deserEx != null) {
                    logger.error(deserEx, "Record at index " + i + " could not be deserialized");
                }
            }
            catch (Exception ex) {
                logger.error(ex, "Record at index " + i + " could not be deserialized");
            }
            throw new BatchListenerFailedException("Deserialization", deserEx, i);
        }
        process(thing);
    }
}

SerializationUtils.byteArrayToDeserializationException()可以用来将头转换为反序列化例外.spring-doc.cadn.net.cn

食用时List<ConsumerRecord<?, ?>,SerializationUtils.getExceptionFromHeader()改用:spring-doc.cadn.net.cn

@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
    for (int i = 0; i < in.size(); i++) {
        ConsumerRecord<String, Thing> rec = in.get(i);
        if (rec.value() == null) {
            DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
                    SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
            if (deserEx != null) {
                logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
                throw new BatchListenerFailedException("Deserialization", deserEx, i);
            }
        }
        process(rec.value());
    }
}
如果你还在使用死信出版恢复者,该记录发布于反序列化例外将会有record.value()类型字节[];这本书不应被序列化。 考虑使用DelegatingByTypeSerializer配置为使用字节数列序列化器字节[]其他类型则使用普通串行器(Json、Avro 等)。

从3.1版本开始,你可以添加一个验证器前往ErrorHandlingDeserializer. 如果代表反串化器成功反序列化对象,但该对象验证失败,会抛出类似反序列化异常的异常。 这使得原始原始数据可以传递给错误处理程序。 自己创建反串器时,只需调用setValidator;如果你用属性配置串行器,设置 consumer 配置属性ErrorHandlingDeserializer.VALIDATOR_CLASS归入你的类别或完全合格的类别名称验证器. 使用Spring Boot时,该物业名称为spring.kafka.consumer.properties.spring.deserializer.validator.class.spring-doc.cadn.net.cn

使用批处理监听器的有效载荷转换

你也可以使用JsonMessageConverter批处理消息传递转换器当你使用批量监听器容器工厂时,可以转换批处理消息。 更多信息请参见序列化、反序列化与消息转换以及Spring消息消息转换spring-doc.cadn.net.cn

默认情况下,转换类型是从监听者参数推断的。 如果你配置了JsonMessageConverter其中默认Jackson2类型Mapper类型优先权设置为TYPE_ID(而不是默认推断),转换器则使用头部中的类型信息(如果存在的话)。 例如,这允许监听器方法声明接口而非具体类。 此外,类型转换器支持映射,因此反序列化可以进行与源不同的类型(只要数据兼容)。 这在使用时也很有用班级层级@KafkaListener实例其中有效载荷必须已经转换,以确定调用哪种方法。 以下示例创建了采用此方法的豆子:spring-doc.cadn.net.cn

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
    return factory;
}

@Bean
public JsonMessageConverter converter() {
    return new JsonMessageConverter();
}

注意,为了实现这一功能,转换目标的方法签名必须是具有单一通用参数类型的容器对象,例如如下:spring-doc.cadn.net.cn

@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

注意你仍然可以访问批处理首部。spring-doc.cadn.net.cn

如果批处理转换器有支持该功能的记录转换器,你还可以收到一份消息列表,显示有效载荷根据通用类型转换。 以下示例展示了如何实现:spring-doc.cadn.net.cn

@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
    ...
}

如果批中的记录无法转换,则其有效载荷设置为进入目标负载列表。 转换异常会作为警告记录记录,并存储在KafkaHeaders.CONVERSION_FAILURES作为List<ConversionException>. 目标@KafkaListener方法可以执行 Java用API过滤掉这些或对 Conversion Exceptions 头部做些作:spring-doc.cadn.net.cn

@KafkaListener(id = "foo", topics = "foo", autoStartup = "false")
public void listen(List<Foo> list,
         @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> conversionFailures) {

    for (int i = 0; i < list.size(); i++) {
        if (conversionFailures.get(i) != null) {
            throw new BatchListenerFailedException("Conversion Failed", conversionFailures.get(i), i);
        }
    }
}

转换服务定制

从版本 2.1.1 开始,org.springframework.core.convert.ConversionService默认使用org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory为解析调用监听器方法的参数,包含所有实现以下任一接口的 BEANS:spring-doc.cadn.net.cn

这让你可以在不更改默认配置的情况下进一步自定义监听器反序列化消费者工厂KafkaListenerContainerFactory.spring-doc.cadn.net.cn

设置自定义MessageHandlerMethodFactoryKafkaListenerEndpointRegistrar通过一个KafkaListenerConfigurerBean会禁用该功能。

添加自定义HandlerMethodArgumentResolver@KafkaListener

从2.4.2版本开始,你可以添加自己的HandlerMethodArgumentResolver并解析自定义方法参数。 你只需要实现KafkaListenerConfigurer和使用方法setCustomMethodArgumentResolvers()来自课堂KafkaListenerEndpointRegistrar.spring-doc.cadn.net.cn

@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        registrar.setCustomMethodArgumentResolvers(
            new HandlerMethodArgumentResolver() {

                @Override
                public boolean supportsParameter(MethodParameter parameter) {
                    return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
                }

                @Override
                public Object resolveArgument(MethodParameter parameter, Message<?> message) {
                    return new CustomMethodArgument(
                        message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
                    );
                }
            }
        );
    }

}

你也可以通过添加自定义代码完全替换框架的参数解析MessageHandlerMethodFactory前往KafkaListenerEndpointRegistrar豆。 如果你这样做,且你的应用需要处理墓碑记录,使用 value()(例如从压缩主题中),你应该添加一个KafkaNullAwarePayloadArgumentResolver工厂;它必须是最后一个解析器,因为它支持所有类型,并且可以匹配无需@Payload注解。 如果你正在使用DefaultMessageHandlerMethodFactory,将该解析器设为最后一个自定义解析器;工厂会确保该解析器在标准之前被使用。PayloadMethodArgumentResolver,该 不了解卡夫卡无负载。spring-doc.cadn.net.cn