Spring kafka 2.2 类型映射 class 加载程序不匹配
Spring kafka 2.2 type mappings class loader mismatch
我正在尝试使用 Spring Kafka 2.2 中引入的新类型映射功能:
向下滚动到 "Mapping Types":
https://docs.spring.io/spring-kafka/reference/htmlsingle/#serdes
在生产者方面,我注册了一个映射如下:
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "foo:com.myfoo.Foo");
消费者如下:
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeSerializer.TYPE_MAPPINGS, "foo:com.yourfoo.Foo");
但是,在com.myfoo.Foo
class的生产者端向Kafka发送事件时,添加到记录中的classId header是com.myfoo.Foo
而不是 foo
。因此在消费者方面,它无法反序列化,因为 com.myfoo.Foo
是未知的。
我已将问题缩小到 spring-kafka 中的此方法:
protected void addHeader(Headers headers, String headerName, Class<?> clazz) {
if (this.classIdMapping.containsKey(clazz)) {
headers.add(new RecordHeader(headerName, this.classIdMapping.get(clazz)));
}
else {
headers.add(new RecordHeader(headerName, clazz.getName().getBytes(StandardCharsets.UTF_8)));
}
}
通过执行kafka记录的序列化进行调试时,执行实际上进入了else分支,而在我的理解中它实际上应该进入if分支并添加foo
作为header。相反,它将 com.myfoo.Foo
添加到 header.
罪魁祸首似乎与 class 加载程序不匹配有关,但我不确定它是否真的是错误或我在这方面做的愚蠢的事情。
基本上,classIdMapping
映射正确填充了 com.myfoo.Foo
作为键,foo
的 UTF-8 字节 [] 表示作为相应的值。
然而,在 if check 期间,clazz
参数是一个 Class 由不同的 class 加载器加载,作为 classIdMapping
映射中存储的内容,因此哈希码不同,它会转到 else 分支。
它实际上是 String-Kafka 方面的错误还是我配置错误?
谢谢
这看起来确实是我们实施中的一个遗漏。请为 Apache Kafka 项目 Spring 提出问题:https://github.com/spring-projects/spring-kafka。我们的想法是不使用 Class<?>
进行映射,而是使用 Fully Qualified Class Name。
您的应用程序似乎是多class加载程序,例如它是网络一。 Apache Kafka 客户端在其 ClassLoader 中加载 senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
,同时其他所有内容均由应用程序上下文 ClassLoader 完成。
作为解决方法,同时我们解决了这个问题,我们建议使用 DefaultKafkaProducerFactory
配置及其 setValueSerializer()
来填充 JsonSerializer
bean 引用。这样,您需要通过其 setTypeMapper()
填充 class 映射,因此, DefaultJackson2JavaTypeMapper
.
上的 setIdClassMapping()
我正在尝试使用 Spring Kafka 2.2 中引入的新类型映射功能:
向下滚动到 "Mapping Types":
https://docs.spring.io/spring-kafka/reference/htmlsingle/#serdes
在生产者方面,我注册了一个映射如下:
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "foo:com.myfoo.Foo");
消费者如下:
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeSerializer.TYPE_MAPPINGS, "foo:com.yourfoo.Foo");
但是,在com.myfoo.Foo
class的生产者端向Kafka发送事件时,添加到记录中的classId header是com.myfoo.Foo
而不是 foo
。因此在消费者方面,它无法反序列化,因为 com.myfoo.Foo
是未知的。
我已将问题缩小到 spring-kafka 中的此方法:
protected void addHeader(Headers headers, String headerName, Class<?> clazz) {
if (this.classIdMapping.containsKey(clazz)) {
headers.add(new RecordHeader(headerName, this.classIdMapping.get(clazz)));
}
else {
headers.add(new RecordHeader(headerName, clazz.getName().getBytes(StandardCharsets.UTF_8)));
}
}
通过执行kafka记录的序列化进行调试时,执行实际上进入了else分支,而在我的理解中它实际上应该进入if分支并添加foo
作为header。相反,它将 com.myfoo.Foo
添加到 header.
罪魁祸首似乎与 class 加载程序不匹配有关,但我不确定它是否真的是错误或我在这方面做的愚蠢的事情。
基本上,classIdMapping
映射正确填充了 com.myfoo.Foo
作为键,foo
的 UTF-8 字节 [] 表示作为相应的值。
然而,在 if check 期间,clazz
参数是一个 Class 由不同的 class 加载器加载,作为 classIdMapping
映射中存储的内容,因此哈希码不同,它会转到 else 分支。
它实际上是 String-Kafka 方面的错误还是我配置错误?
谢谢
这看起来确实是我们实施中的一个遗漏。请为 Apache Kafka 项目 Spring 提出问题:https://github.com/spring-projects/spring-kafka。我们的想法是不使用 Class<?>
进行映射,而是使用 Fully Qualified Class Name。
您的应用程序似乎是多class加载程序,例如它是网络一。 Apache Kafka 客户端在其 ClassLoader 中加载 senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
,同时其他所有内容均由应用程序上下文 ClassLoader 完成。
作为解决方法,同时我们解决了这个问题,我们建议使用 DefaultKafkaProducerFactory
配置及其 setValueSerializer()
来填充 JsonSerializer
bean 引用。这样,您需要通过其 setTypeMapper()
填充 class 映射,因此, DefaultJackson2JavaTypeMapper
.
setIdClassMapping()