串行化、反串化与消息转换
概述
Apache Kafka 提供了一个用于序列化和反序列化记录值及其键的高级 API。
它与org.apache.kafka.common.serialization.Serializer<T>和org.apache.kafka.common.serialization.Deserializer<T>带有一些内置实现的抽象。
同时,我们可以通过以下方式来指定串行器和反串行器类制作人或消费者配置属性。
以下示例展示了如何实现:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
对于更复杂或特殊的情况,卡夫卡消费者(因此,卡夫卡制片人) 提供过载
构造者接受串行器和反串化器的实例钥匙和值分别。
当你使用这个 API 时,默认KafkaProducerFactory和DefaultKafkaConsumerFactory还要通过构造函数或设定器方法提供属性以注入自定义串行器和反串化器实例进入目标制作人或消费者.
另外,你也可以通过提供商<序列号生成器>或提供商<去串行器>通过构造子实现实例——这些提供商每个 的创建时 都被调用制作人或消费者.
字符串序列化
自2.5版本起,Spring for Apache Kafka 提供了以下内容ToStringSerializer和解析字符串反序列化器使用String表示实体的类。
他们依赖于各种方法toString以及一些Function<String>或双功能<字符串,头部>解析字符串并填充实例的属性。
通常,这会调用类上的某种静态方法,例如解析:
ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);
默认情况下,ToStringSerializer配置为传递记录中序列化实体的类型信息头.
你可以通过设置addTypeInfo属性到false.
这些信息可以被解析字符串反序列化器在接受方。
-
ToStringSerializer.ADD_TYPE_INFO_HEADERS(默认true你可以设置为false在ToStringSerializer(设置addTypeInfo财产)。
ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
String entityType = new String(header);
if (entityType.contains("Thing")) {
return Thing.parse(str);
}
else {
// ...parsing logic
}
});
你可以配置字符集用于转换字符串来/回字节[]默认为UTF-8.
你可以用解析器方法的名称配置解串器,使用ConsumerConfig性能:
-
ParseStringDeserializer.KEY帕瑟 -
ParseStringDeserializer.VALUE_PARSER
属性必须包含类的完整限定名称,后面是方法名,中间用句点分隔..
该方法必须是静态的,并且符号为(字符串,头部)或(弦乐).
一个致StringSerde也提供与Kafka Streams一起使用的版本。
JSON
Spring for Apache Kafka 也提供JacksonJsonSerializer和JacksonJsonDeserializer基于
Jackson JSON 对象映射器。
这JacksonJsonSerializer允许将任意 Java 对象写成 JSON字节[].
这JacksonJsonDeserializer需要额外的职业<?>目标类型允许对被消耗的 进行反序列化的参数字节[]到正确的目标物体。
以下示例展示了如何创建JacksonJsonDeserializer:
JacksonJsonDeserializer<Thing> thingDeserializer = new JacksonJsonDeserializer<>(Thing.class);
你可以自定义两者JacksonJsonSerializer和JacksonJsonDeserializer带有对象映射器.
你也可以扩展它们,实现某些特定的配置逻辑configure(Map<String, ?> configs, boolean isKey)方法。
从版本 2.3 开始,所有支持 JSON 的组件默认配置为JacksonUtils.enhancedObjectMapper()实例,伴随MapperFeature.DEFAULT_VIEW_INCLUSION和DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES功能被禁用。
此外,这样的实例还配备了已知的自定义数据类型模块,如 Java 时间和 Kotlin 支持。
看JacksonUtils.enhancedObjectMapper()更多信息请参见JavaDocs。
该方法还会注册org.springframework.kafka.support.JacksonMimeTypeModule为org.springframework.util.MimeType将对象序列化为纯字符串,以便通过网络实现平台间兼容性。
一个JacksonMime类型模块可以在应用上下文中注册为 BEAN,并且会自动配置为Spring靴对象映射器实例.
同样从2.3版本开始,JsonDeserializer提供类型参考基于 的构造器,用于更好地处理目标通用容器类型。
从2.1版本开始,你可以在记录中传递类型信息头允许处理多种类型的作。
此外,你还可以通过以下 Kafka 属性配置串行器和反串行器。
如果你已经提供了,这些条款就没有影响串行器和反串化器的实例卡夫卡消费者和卡夫卡制片人分别。
配置属性
-
JacksonJsonSerializer.ADD_TYPE_INFO_HEADERS(默认true你可以设置为false在JacksonJsonSerializer(设置addTypeInfo财产)。 -
JacksonJsonSerializer.TYPE_MAPPINGS(默认empty:参见映射类型。 -
JacksonJsonDeserializer.USE_TYPE_INFO_HEADERS(默认true你可以设置为false忽略串行器设置的头部。 -
JacksonJsonDeserializer.REMOVE_TYPE_INFO_HEADERS(默认true你可以设置为false以保留串行器设置的头部。 -
JacksonJsonDeserializer.KEY_默认类型:如果没有头部信息,用于键的反序列化的备份类型。 -
JacksonJsonDeserializer.VALUE_DEFAULT_TYPE:如果没有头部信息,用于反序列化的备份类型。 -
JacksonJsonDeserializer.TRUSTED_PACKAGES(默认java.util,java.lang): 允许反序列化的包模式列表。 意味着全部去序列化。* -
JacksonJsonDeserializer.TYPE_MAPPINGS(默认empty:参见映射类型。 -
JacksonJsonDeserializer.KEY类型方法(默认empty参见“利用方法确定类型”。 -
JacksonJsonDeserializer.VALUE_TYPE_METHOD(默认empty参见“利用方法确定类型”。
从2.2版本开始,类型信息头部(如果由串行器添加)会被解串器移除。
你可以通过设置removeTypeHeaders属性到false,要么直接在反串器上,要么带有前面描述的配置性质。
从版本 2.8 开始,如果你按照程序化构建方式构建序列化器或反串行化器,工厂会应用上述属性,只要你没有明确设置任何属性(使用set*()方法或使用 fluent API)。
此前,在程序化创建时,配置属性从未应用;如果你直接在对象上显式设置属性,情况依然如此。 |
映射类型
从2.2版本开始,使用JSON时,可以通过使用前面列表中的属性来提供类型映射。
以前,你需要在串行器和解串器中自定义类型映射器。
映射由一个逗号分隔的列表组成token:className对。
在出站时,有效载荷的类名会映射到对应的Tokens。
在入站时,类型头中的Tokens映射到对应的类名。
以下示例创建一组映射:
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");
| 对应的对象必须兼容。 |
如果你使用 Spring Boot,你可以在application.properties(或称yaml)文件。
以下示例展示了如何实现:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat
|
你只能用属性进行简单的配置。
用于更高级的配置(例如使用自定义配置)
此外,还提供了传球手,作为使用这些构造器的替代方案。 |
使用 Spring Boot 并覆盖消费者工厂和生产工厂如上所示,需要在 BEAN 方法返回类型中使用万用符泛型。
如果提供了具体的通用类型,Spring Boot 会忽略这些豆子,仍然使用默认的豆子。 |
从2.2版本开始,你可以明确配置反串化器使用提供的目标类型,并通过使用带有布尔值的超载构造函数,忽略头部中的类型信息useHeadersIfPresent参数(即true默认情况下)。
以下示例展示了如何实现:
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
利用方法确定类型
从2.5版本开始,你现在可以通过属性配置反串行器,调用方法来确定目标类型。
如果存在,这将覆盖上述所有其他技术。
如果数据是由不使用 Spring 串行器的应用程序发布的,并且你需要根据数据或其他头部进行不同类型反序列化,这会很有用。
将这些属性设置为方法名——一个完全限定的类名,后跟方法名,中间用句号分隔..
该方法必须声明为公共静态,有三种签名之一(字符串主题,字节[]数据,头部),(字节数据,头部)或(字节[] 数据)并返回一辆JacksonJavaType.
-
JsonDeserializer.KEY类型方法:spring.json.key.type.method -
JsonDeserializer.VALUE_TYPE_METHOD:spring.json.value.type.method
你可以使用任意的头部,或者检查数据来确定类型。
JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);
JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);
public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
// {"thisIsAFieldInThing1":"value", ...
if (data[21] == '1') {
return thing1Type;
}
else {
return thing2Type;
}
}
对于更复杂的数据检查,可以考虑使用JsonPath或类似的,但测试越简单,确定类型,过程就越高效。
以下是通过程序创建解串器的示例(当在构造函数中向消费级工厂提供解串器时):
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);
...
public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
...
}
程序化构建
自2.3版本起,在为生产者/消费工厂编程构建串行化器/反串行器时,可以使用流流API简化配置。
@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
new JsonSerializer<MyKeyType>()
.forKeys()
.noTypeInfo(),
new JsonSerializer<MyValueType>()
.noTypeInfo());
return pf;
}
@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
new JsonDeserializer<>(MyKeyType.class)
.forKeys()
.ignoreTypeHeaders(),
new JsonDeserializer<>(MyValueType.class)
.ignoreTypeHeaders());
return cf;
}
要通过程序实现类型映射,类似于“使用方法确定类型”,可以使用类型功能财产。
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeFunction(MyUtils::thingOneOrThingTwo);
或者,只要你不使用 fluent API 来配置属性,或者用以下方式设置属性set*()工厂将利用配置属性配置串行器/解串器;参见配置属性。
分派串行器和解串器
使用 头部
2.3 版本引入了授权序列化器和授权去序列化器,允许生成和使用不同键和/或值类型的记录。
制作人必须设置一个头部DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR到一个选择器值,用于选择该值和DelegatingSerializer.KEY序列化选择器用于钥匙;如果未找到匹配,则非法州例外被抛出。
对于输入记录,解串器使用相同的头部来选择所需的解串器;如果找不到匹配或没有头部,原始文件字节[]被归还。
你可以配置选择器的映射到串行器 / 反串化器通过构造函数,或者你可以通过 Kafka 的生产者/消费者属性配置,并带有密钥DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG和DelegatingSerializer.KEY_序列化选择器_配置.
对于序列化器,生产者属性可以是Map<String,对象>其中键是选择器,值为串行器实例,串行器类或者班级名称。
该性质也可以是逗号分隔映射条目的字符串,如下所示。
对于解串器,消费者属性可以是Map<String,对象>其中键是选择器,值为反串化器实例,一个反串化器类或者班级名称。
该性质也可以是逗号分隔映射条目的字符串,如下所示。
要使用属性进行配置,请使用以下语法:
producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")
consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")
制片人随后会设置DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR头部 至东西1或东西2.
该技术支持将不同类型发送到同一主题(或不同主题)。
从2.5.1版本开始,如果类型(键或值)是支持的标准类型之一,则无需设置选择器头部塞尔德斯 (长,整数,等等)。
相反,串行器会将报头设置为类型的类名。
无需为这些类型配置串行化器或解串化器,它们会动态生成一次。 |
关于另一种将不同类型发送到不同主题的技巧,请参见用RoutingKafkaTemplate.
按类型划分
2.8 版本引入了DelegatingByTypeSerializer.
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null, new DelegatingByTypeSerializer(Map.of(
byte[].class, new ByteArraySerializer(),
Bytes.class, new BytesSerializer(),
String.class, new StringSerializer())));
}
从版本 2.8.3 开始,你可以配置串行化器检查映射键是否可从目标对象分配,这在代理串行器可以序列化子类时非常有用。
在这种情况下,如果存在歧义匹配,则有序地图,例如LinkedHashMap应当提供。
按主题
从2.8版本开始,DelegatingByTopicSerializer和DelegatingByTopicDeserializer允许根据主题名称选择串行器/解串器。
正则表达式模式s 用于查找要使用的实例。
该映射可以通过构造函数配置,或通过属性(引号分隔的列表)配置模式:序列化器).
producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArraySerializer.class.getName()
+ ", topic[5-9]:" + StringSerializer.class.getName());
...
consumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArrayDeserializer.class.getName()
+ ", topic[5-9]:" + StringDeserializer.class.getName());
用KEY_SERIALIZATION_TOPIC_CONFIG用这个来做钥匙时,
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
new IntegerSerializer(),
new DelegatingByTopicSerializer(Map.of(
Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
Pattern.compile("topic[5-9]"), new StringSerializer())),
new JsonSerializer<Object>()); // default
}
你可以指定默认串行器/解串器,当没有模式匹配时使用DelegatingByTopicSerialization.KEY_序列化_主题_默认和DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT.
附加财产DelegatingByTopicSerialization.CASE_SENSITIVE(默认true),当设置为false这会让主题查找的案例变得不敏感。
重试反串器
这RetryingDeserializer使用代理反串化器和重试模板当代理在解序列过程中可能出现暂时错误(如网络问题)时,重新尝试反序列化。
ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
恢复回调应设置为RetryingDeserializer,如果所有重试都用尽,则返回一个备用对象。
春季消息消息转换
虽然串行器和反串化器API 从底层 Kafka 出发非常简单且灵活消费者和制作人从角度来看,在使用任一情况下,你可能需要在 Spring Messaging 层面提供更多灵活性@KafkaListener或者 Spring Integration 的 Apache Kafka 支持。
这样你就能轻松地与org.springframework.messaging.MessageSpring for Apache Kafka 提供了消息转换器抽象化消息信息转换器实现及其JacksonJsonMessageConverter(以及子职业)自定义。
你可以注射消息转换器变成了卡夫卡模板实例直接和摘要KafkaListenerContainerFactoryBean对@KafkaListener.containerFactory()财产。
以下示例展示了如何实现:
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordMessageConverter(new JsonMessageConverter());
return factory;
}
...
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}
使用Spring Boot时,只需将转换器定义为@Bean而 Spring Boot 的自动配置会将其连接到自动配置的模板和容器工厂。
当你使用@KafkaListener参数类型会提供给消息转换器以辅助转换。
|
这种类型推断只有在 |
|
在消费者端,你可以配置 在生产者端,当你使用 Spring Integration 或
同样,使用 为了方便起见,从2.3版本开始,该框架还提供了 |
从2.7.1版本开始,消息有效载荷转换可以委派给春季消息 智能消息转换器;这使得转换能够基于MessageHeaders.CONTENT_TYPE页眉。
这KafkaMessageConverter.fromMessage()该方法用于出站转换为制作人唱片消息有效载荷为ProducerRecord.value()财产。
这KafkaMessageConverter.toMessage()该方法用于从中的入站转换消费者记录有效载荷为ConsumerRecord.value()财产。
这SmartMessageConverter.toMessage()调用方法来创建一个新的出站留言<?>来自消息传球给fromMessage()(通常由KafkaTemplate.send(Message<?> msg)).
类似地,在KafkaMessageConverter.toMessage()方法,在转换器创建新的留言<?>来自消费者记录这SmartMessageConverter.fromMessage()调用方法,然后用新转换的有效载荷创建最终的入站消息。
无论哪种情况,如果智能消息转换器返回零,使用原始消息。 |
当默认转换器被用于卡夫卡模板而监听器容器工厂,你配置了智能消息转换器通过呼叫setMessagingConverter()在模板上,并通过内容类型转换器在@KafkaListener方法。
例子:
template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
...
}
使用 Spring 数据投影接口
从2.1.1版本开始,你可以将JSON转换为Spring Data Projection接口,而不是具体类型。 这允许对数据进行非常选择性和低耦合的绑定,包括从JSON文档中多个位置查找值。 例如,以下接口可以定义为消息有效载荷类型:
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
默认情况下,访问器方法将以字段形式查找收到的 JSON 文档中的属性名称。
这@JsonPath表达式允许自定义值查找,甚至可以定义多个 JSON 路径表达式,从多个位置查找值,直到表达式返回实际值。
要启用此功能,请使用Jackson投影消息转换器配置为合适的代理转换器(用于出站转换和非投影接口转换)。你还必须添加Spring-data:Spring-Data-commons和com.jayway.jsonpath:json-path去上课路径。
当用作参数时@KafkaListener该接口类型会像正常一样自动传递给转换器。
用ErrorHandlingDeserializer
当反串行器未能反串行消息时,Spring 无法处理该问题,因为问题发生在poll()返回。 为解决此问题,以下ErrorHandlingDeserializer已被引入。该解串器将委托给真实的反串行器(键或值)。如果代理未能对记录内容进行反序列化,则ErrorHandlingDeserializer返回 a零值和反序列化例外在包含原因和原始字节的头部中。当你使用记录级别时消息监听器,如果消费者记录包含一个反序列化例外键或值的头部,容器的错误处理程序与失败的消费者记录. 唱片不会传递给听众。
或者,你也可以配置ErrorHandlingDeserializer通过提供失败的反序列化函数,即Function<FailedDeserializationInfo, T>. 调用该函数来创建T,以通常方式传递给监听者。一个类型的对象FailedDeserializationInfo,包含所有上下文信息,提供给函数。你可以找到反序列化例外(作为序列化的 Java 对象)在首部中。参见 Javadoc 中的ErrorHandlingDeserializer更多信息请见。
你可以使用DefaultKafkaConsumerFactory构造器取密钥和值反串化器对象和电线在适当的位置ErrorHandlingDeserializer你已经配置了合适的代理实例。或者,你也可以使用消费者配置属性(这些属性被ErrorHandlingDeserializer)以实例化代理。属性名称为ErrorHandlingDeserializer.KEY_反串行器类和ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS. 属性值可以是类或类名。以下示例展示了如何设置这些属性:
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
以下示例使用了一个失败的反序列化函数.
public class BadThing extends Thing {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {
@Override
public Thing apply(FailedDeserializationInfo info) {
return new BadThing(info);
}
}
前述示例使用了以下配置:
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
如果消费者配置为ErrorHandlingDeserializer配置卡夫卡模板以及其制作者,配备一个既能处理普通对象也能处理原始内容的串行化器字节[]这些值是由反序列化异常产生的。模板的通用值类型应为对象. 一种方法是使用DelegatingByTypeSerializer; 以下是一个示例: |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
当使用ErrorHandlingDeserializer使用批处理监听器时,必须检查消息头中的反序列化异常。当与DefaultBatchErrorHandler你可以利用该头来确定异常失败的记录,并通过BatchListenerFailedException.
@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
for (int i = 0; i < in.size(); i++) {
Thing thing = in.get(i);
if (thing == null
&& headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
try {
DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
if (deserEx != null) {
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
}
}
catch (Exception ex) {
logger.error(ex, "Record at index " + i + " could not be deserialized");
}
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
process(thing);
}
}
SerializationUtils.byteArrayToDeserializationException()可以用来将头转换为反序列化例外.
食用时List<ConsumerRecord<?, ?>,SerializationUtils.getExceptionFromHeader()改用:
@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
for (int i = 0; i < in.size(); i++) {
ConsumerRecord<String, Thing> rec = in.get(i);
if (rec.value() == null) {
DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
if (deserEx != null) {
logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
}
process(rec.value());
}
}
如果你还在使用死信出版恢复者,该记录发布于反序列化例外将会有record.value()类型字节[]; 这不应被序列化。考虑使用DelegatingByTypeSerializer配置为使用字节数列序列化器为字节[]其他类型则使用普通串行器(Json、Avro 等)。 |
从3.1版本开始,你可以添加一个验证器前往ErrorHandlingDeserializer. 如果代表反串化器成功反序列化对象,但该对象验证失败,会抛出类似反序列化异常的异常。这允许原始原始数据传递给错误处理程序。自己创建反串化器时,只需调用setValidator; 如果你用属性配置串行器,设置 consumer 配置属性ErrorHandlingDeserializer.VALIDATOR_CLASS归入你的类别或完全合格的类别名称验证器.
使用Spring Boot时,该物业名称为spring.kafka.consumer.properties.spring.deserializer.validator.class.
使用批处理监听器的有效载荷转换
你也可以使用JacksonJsonMessageConverter在批处理消息传递转换器当你使用批量监听器容器工厂时,可以转换批处理消息。
更多信息请参见序列化、反序列化与消息转换以及Spring消息消息转换。
默认情况下,转换类型是从监听者参数推断的。
如果你配置了JacksonJsonMessageConverter其中默认Jackson2类型Mapper其类型优先权设置为TYPE_ID(而不是默认推断),转换器则使用头部中的类型信息(如果存在的话)。
例如,这允许监听器方法声明接口而非具体类。
此外,类型转换器支持映射,因此反序列化可以进行与源不同的类型(只要数据兼容)。
这在使用时也很有用班级层级@KafkaListener实例其中有效载荷必须已经转换,以确定调用哪种方法。
以下示例创建了采用此方法的豆子:
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
return factory;
}
@Bean
public JsonMessageConverter converter() {
return new JsonMessageConverter();
}
注意,为了实现这一功能,转换目标的方法签名必须是具有单一通用参数类型的容器对象,例如如下:
@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
注意你仍然可以访问批处理首部。
如果批处理转换器有支持该功能的记录转换器,你还可以收到一份消息列表,显示有效载荷根据通用类型转换。 以下示例展示了如何实现:
@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
...
}
如果批中的记录无法转换,则其有效载荷设置为零进入目标负载列表。
转换异常会作为警告记录记录,并存储在KafkaHeaders.CONVERSION_FAILURES作为List<ConversionException>.
目标@KafkaListener方法可以执行 Java流用API过滤掉这些零或对 Conversion Exceptions 头部做些作:
@KafkaListener(id = "foo", topics = "foo", autoStartup = "false")
public void listen(List<Foo> list,
@Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> conversionFailures) {
for (int i = 0; i < list.size(); i++) {
if (conversionFailures.get(i) != null) {
throw new BatchListenerFailedException("Conversion Failed", conversionFailures.get(i), i);
}
}
}
转换服务定制
从版本 2.1.1 开始,org.springframework.core.convert.ConversionService默认使用org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory为解析调用监听器方法的参数,包含所有实现以下任一接口的 BEANS:
-
org.springframework.core.convert.converter.Converter -
org.springframework.core.convert.converter.GenericConverter -
org.springframework.format.Formatter
这让你可以在不更改默认配置的情况下进一步自定义监听器反序列化消费者工厂和KafkaListenerContainerFactory.
设置自定义MessageHandlerMethodFactory在KafkaListenerEndpointRegistrar通过一个KafkaListenerConfigurerBean会禁用该功能。 |
添加自定义HandlerMethodArgumentResolver自@KafkaListener
从2.4.2版本开始,你可以添加自己的HandlerMethodArgumentResolver并解析自定义方法参数。
你只需要实现KafkaListenerConfigurer和使用方法setCustomMethodArgumentResolvers()来自课堂KafkaListenerEndpointRegistrar.
@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setCustomMethodArgumentResolvers(
new HandlerMethodArgumentResolver() {
@Override
public boolean supportsParameter(MethodParameter parameter) {
return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
return new CustomMethodArgument(
message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
);
}
}
);
}
}
你也可以通过添加自定义代码完全替换框架的参数解析MessageHandlerMethodFactory前往KafkaListenerEndpointRegistrar豆。
如果你这样做,且你的应用需要处理墓碑记录,使用零 value()(例如从压缩主题中),你应该添加一个KafkaNullAwarePayloadArgumentResolver工厂;它必须是最后一个解析器,因为它支持所有类型,并且可以匹配无需@Payload注解。
如果你正在使用DefaultMessageHandlerMethodFactory,将该解析器设为最后一个自定义解析器;工厂会确保该解析器在标准之前被使用。PayloadMethodArgumentResolver,该 不了解卡夫卡无负载。