Spring kafka 2.2 类型映射 class 加载程序不匹配

Spring kafka 2.2 type mappings class loader mismatch

我正在尝试使用 Spring Kafka 2.2 中引入的新类型映射功能:

向下滚动到 "Mapping Types":
https://docs.spring.io/spring-kafka/reference/htmlsingle/#serdes

在生产者方面,我注册了一个映射如下:

senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "foo:com.myfoo.Foo");

消费者如下:

consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeSerializer.TYPE_MAPPINGS, "foo:com.yourfoo.Foo");

但是,在com.myfoo.Foo class的生产者端向Kafka发送事件时,添加到记录中的classId header是com.myfoo.Foo 而不是 foo。因此在消费者方面,它无法反序列化,因为 com.myfoo.Foo 是未知的。

我已将问题缩小到 spring-kafka 中的此方法:

https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/support/converter/AbstractJavaTypeMapper.java#L142

protected void addHeader(Headers headers, String headerName, Class<?> clazz) {
    if (this.classIdMapping.containsKey(clazz)) {
        headers.add(new RecordHeader(headerName, this.classIdMapping.get(clazz)));
    }
    else {
        headers.add(new RecordHeader(headerName, clazz.getName().getBytes(StandardCharsets.UTF_8)));
    }
}

通过执行kafka记录的序列化进行调试时,执行实际上进入了else分支,而在我的理解中它实际上应该进入if分支并添加foo作为header。相反,它将 com.myfoo.Foo 添加到 header.

罪魁祸首似乎与 class 加载程序不匹配有关,但我不确定它是否真的是错误或我在这方面做的愚蠢的事情。

基本上,classIdMapping 映射正确填充了 com.myfoo.Foo 作为键,foo 的 UTF-8 字节 [] 表示作为相应的值。

然而,在 if check 期间,clazz 参数是一个 Class 由不同的 class 加载器加载,作为 classIdMapping 映射中存储的内容,因此哈希码不同,它会转到 else 分支。

它实际上是 String-Kafka 方面的错误还是我配置错误?

谢谢

这看起来确实是我们实施中的一个遗漏。请为 Apache Kafka 项目 Spring 提出问题:https://github.com/spring-projects/spring-kafka。我们的想法是不使用 Class<?> 进行映射,而是使用 Fully Qualified Class Name

您的应用程序似乎是多class加载程序,例如它是网络一。 Apache Kafka 客户端在其 ClassLoader 中加载 senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);,同时其他所有内容均由应用程序上下文 ClassLoader 完成。

作为解决方法,同时我们解决了这个问题,我们建议使用 DefaultKafkaProducerFactory 配置及其 setValueSerializer() 来填充 JsonSerializer bean 引用。这样,您需要通过其 setTypeMapper() 填充 class 映射,因此, DefaultJackson2JavaTypeMapper.

上的 setIdClassMapping()