在 Kafka JSON 序列化中将超类型作为类型信息发送
Send supertype as type information in Kafka JSON Serialization
我正在使用 Spring Boot 使用 Kafka 将数据从一个应用程序发送到另一个应用程序。
我的设计使用一个接口来声明正在发送的数据:
package domain;
interface Data {
public String getData();
public void setData(String data);
}
制作人
在源应用程序中,我将此接口实现为数据库实体。
package persistence;
@Data
class DataEntity implements Data {
private String data; // lombok generates getter/setters
}
添加实体后,我想使用 KafkaTemplate
将其作为更新发送到 Kafka
@Component
class DataPublisher implements ApplicationListener<DataEvent> {
@Autowired private KafkaTemplate<String,Data> template;
// I left out DataEvent which is a straightforward ApplicationEvent override
@EventListener(classes = DataEvent.class)
public void onApplicationEvent(DataEvent event) {
template.send("data", (Data) event.getSource());
}
}
// triggered by this call in a service
eventPublisher.publishEvent(new DataEvent(updatedData));
序列化是通过属性完成的
spring:
kafka:
consumer:
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties.spring.json.value.default.type: domain.Data
查看kafkacat输出,数据发送正常。
消费者
接收端我有
@KafkaListener(topics = "data")
public void dataUpdated(@Payload Data data) {
dataService.updateData(data);
}
这导致
Caused by: java.lang.IllegalArgumentException: The class 'persistence.DataEntity' is not in the trusted packages [...]
我完全理解 - 序列化器发送 persistence.DataEntity
object,但客户端期望 domain.Data
object。但这就是设计应该的样子;我希望客户端只知道域包,而不是它的 persistence
实现。 (作为附带问题,我在哪里可以看到这种类型 header?它不在编码的 json AFAICT 中,我错过了什么?)
所以问题是:如何强制 Spring JsonDeserializer
发送 domain.Data
作为序列化数据类型?
我确实在序列化程序 class 中找到了一个 TYPE_MAPPING 属性,但它唯一的文档是它 "add[s] type mappings to the type mapper: 'foo:com.Foo,bar:com.Bar'",它没有解释任何东西,我可以'找不到示例用法。
编辑:
我确实添加了
spring.kafka.producer.properties.spring.json.type.mapping=domain.Data:persistence.DataEntity
生产者的财产,但这没有帮助。
你必须在两边都提供映射。
但是,您不应使用 JsonDeserializer
,而应使用 BytesDeserializer
和 BytesJsonMessageConverter
(只需添加一个作为 @Bean
,Boot 会将其连接到容器工厂)。
这样框架会自动转换成参数类型
再次参见 the documentation。
我正在使用 Spring Boot 使用 Kafka 将数据从一个应用程序发送到另一个应用程序。
我的设计使用一个接口来声明正在发送的数据:
package domain;
interface Data {
public String getData();
public void setData(String data);
}
制作人
在源应用程序中,我将此接口实现为数据库实体。
package persistence;
@Data
class DataEntity implements Data {
private String data; // lombok generates getter/setters
}
添加实体后,我想使用 KafkaTemplate
将其作为更新发送到 Kafka@Component
class DataPublisher implements ApplicationListener<DataEvent> {
@Autowired private KafkaTemplate<String,Data> template;
// I left out DataEvent which is a straightforward ApplicationEvent override
@EventListener(classes = DataEvent.class)
public void onApplicationEvent(DataEvent event) {
template.send("data", (Data) event.getSource());
}
}
// triggered by this call in a service
eventPublisher.publishEvent(new DataEvent(updatedData));
序列化是通过属性完成的
spring:
kafka:
consumer:
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties.spring.json.value.default.type: domain.Data
查看kafkacat输出,数据发送正常。
消费者
接收端我有
@KafkaListener(topics = "data")
public void dataUpdated(@Payload Data data) {
dataService.updateData(data);
}
这导致
Caused by: java.lang.IllegalArgumentException: The class 'persistence.DataEntity' is not in the trusted packages [...]
我完全理解 - 序列化器发送 persistence.DataEntity
object,但客户端期望 domain.Data
object。但这就是设计应该的样子;我希望客户端只知道域包,而不是它的 persistence
实现。 (作为附带问题,我在哪里可以看到这种类型 header?它不在编码的 json AFAICT 中,我错过了什么?)
所以问题是:如何强制 Spring JsonDeserializer
发送 domain.Data
作为序列化数据类型?
我确实在序列化程序 class 中找到了一个 TYPE_MAPPING 属性,但它唯一的文档是它 "add[s] type mappings to the type mapper: 'foo:com.Foo,bar:com.Bar'",它没有解释任何东西,我可以'找不到示例用法。
编辑:
我确实添加了
spring.kafka.producer.properties.spring.json.type.mapping=domain.Data:persistence.DataEntity
生产者的财产,但这没有帮助。
你必须在两边都提供映射。
但是,您不应使用 JsonDeserializer
,而应使用 BytesDeserializer
和 BytesJsonMessageConverter
(只需添加一个作为 @Bean
,Boot 会将其连接到容器工厂)。
这样框架会自动转换成参数类型
再次参见 the documentation。