在 Kafka JSON 序列化中将超类型作为类型信息发送

Send supertype as type information in Kafka JSON Serialization

我正在使用 Spring Boot 使用 Kafka 将数据从一个应用程序发送到另一个应用程序。

我的设计使用一个接口来声明正在发送的数据:

package domain;

interface Data {
    public String getData();
    public void setData(String data);
}

制作人

在源应用程序中,我将此接口实现为数据库实体。

package persistence;

@Data
class DataEntity implements Data {
    private String data;  // lombok generates getter/setters
}

添加实体后,我想使用 KafkaTemplate

将其作为更新发送到 Kafka
@Component
class DataPublisher implements ApplicationListener<DataEvent> {
    @Autowired private KafkaTemplate<String,Data> template;

    // I left out DataEvent which is a straightforward ApplicationEvent override
    @EventListener(classes = DataEvent.class)
    public void onApplicationEvent(DataEvent event) {
        template.send("data", (Data) event.getSource());
    }
}
// triggered by this call in a service
    eventPublisher.publishEvent(new DataEvent(updatedData));

序列化是通过属性完成的

spring:
    kafka:
        consumer:
            value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
            properties.spring.json.value.default.type: domain.Data

查看kafkacat输出,数据发送正常。

消费者

接收端我有

@KafkaListener(topics = "data")
public void dataUpdated(@Payload Data data) {
    dataService.updateData(data);
}

这导致

Caused by: java.lang.IllegalArgumentException: The class 'persistence.DataEntity' is not in the trusted packages [...]

我完全理解 - 序列化器发送 persistence.DataEntity object,但客户端期望 domain.Data object。但这就是设计应该的样子;我希望客户端只知道域包,而不是它的 persistence 实现。 (作为附带问题,我在哪里可以看到这种类型 header?它不在编码的 json AFAICT 中,我错过了什么?)

所以问题是:如何强制 Spring JsonDeserializer 发送 domain.Data 作为序列化数据类型?

我确实在序列化程序 class 中找到了一个 TYPE_MAPPING 属性,但它唯一的文档是它 "add[s] type mappings to the type mapper: 'foo:com.Foo,bar:com.Bar'",它没有解释任何东西,我可以'找不到示例用法。

编辑:

我确实添加了

spring.kafka.producer.properties.spring.json.type.mapping=domain.Data:persistence.DataEntity

生产者的财产,但这没有帮助。

the documentation

你必须在两边都提供映射。

但是,您不应使用 JsonDeserializer,而应使用 BytesDeserializerBytesJsonMessageConverter(只需添加一个作为 @Bean,Boot 会将其连接到容器工厂)。

这样框架会自动转换成参数类型

再次参见 the documentation