Spring 启动 Kafka class 反序列化 - 不在受信任的包中
Spring boot Kafka class deserialization - not in the trusted package
我知道这个问题很常见,但在尝试了不同的解决方案后,我找不到任何有效的解决方案。在 Kafka 中接收消息时,我想反序列化字符串以及我的自定义 class 对象。使用 String 一切都很好,但使用我的 Class 就不行了。我在消费者配置中添加了受信任的包(com.springmiddleware.entities
是我的 class 所在的包):
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.springmiddleware.entities");
return props;
}
我的 application.yml
文件中有这个:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: foo
auto-offset-reset: earliest
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: 'com.springmiddleware.entities'
并将这些行添加到 application.properties
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.springmiddleware.entities
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
但是一直显示以下错误:
org.apache.kafka.common.errors.SerializationException: Error
deserializing key/value for partition topic2-0 at offset 1. If needed,
please seek past the record to continue consumption. Caused by:
java.lang.IllegalArgumentException: The class
'com.springmiddleware.entities.Crime' is not in the trusted packages:
[java.util, java.lang]. If you believe this class is safe to
deserialize, please provide its name. If the serialization is only
done by a trusted source, you can also enable trust all (*).
更新
接收器配置:
@EnableKafka
@Configuration
public class ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.springmiddleware.entities");
props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, "false");
return props;
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
更新 2
Listener Class (Receiver):
@KafkaListener(topics = "${app.topic.foo}")
@Service
public class Receiver {
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaHandler
public void listen(@Payload Crime message) {
System.out.println("Received " + message);
}
@KafkaHandler
public void listen(@Payload String message) {
System.out.println("Received " + message);
}
只需使用重载 JsonDeserializer
构造函数
Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean useHeadersIfPresent (which is true by default).
以下示例显示了如何执行此操作:
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
您的代码:
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(Object.class,false));
}
现在使用 @KafkaListener
class 级别
@KafkaListener(topics = "myTopic")
@Service
public class MultiListenerBean {
@KafkaHandler
public void listen(Cat cat) {
...
}
@KafkaHandler
public void listen(Hat hat) {
...
}
@KafkaHandler(isDefault = true)
public void delete(Object obj) {
...
}
}
我知道这个问题很常见,但在尝试了不同的解决方案后,我找不到任何有效的解决方案。在 Kafka 中接收消息时,我想反序列化字符串以及我的自定义 class 对象。使用 String 一切都很好,但使用我的 Class 就不行了。我在消费者配置中添加了受信任的包(com.springmiddleware.entities
是我的 class 所在的包):
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.springmiddleware.entities");
return props;
}
我的 application.yml
文件中有这个:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: foo
auto-offset-reset: earliest
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: 'com.springmiddleware.entities'
并将这些行添加到 application.properties
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.springmiddleware.entities
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
但是一直显示以下错误:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic2-0 at offset 1. If needed, please seek past the record to continue consumption. Caused by: java.lang.IllegalArgumentException: The class 'com.springmiddleware.entities.Crime' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
更新
接收器配置:
@EnableKafka
@Configuration
public class ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.springmiddleware.entities");
props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, "false");
return props;
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
更新 2
Listener Class (Receiver):
@KafkaListener(topics = "${app.topic.foo}")
@Service
public class Receiver {
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaHandler
public void listen(@Payload Crime message) {
System.out.println("Received " + message);
}
@KafkaHandler
public void listen(@Payload String message) {
System.out.println("Received " + message);
}
只需使用重载 JsonDeserializer
构造函数
Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean useHeadersIfPresent (which is true by default).
以下示例显示了如何执行此操作:
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
您的代码:
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(Object.class,false));
}
现在使用 @KafkaListener
class 级别
@KafkaListener(topics = "myTopic")
@Service
public class MultiListenerBean {
@KafkaHandler
public void listen(Cat cat) {
...
}
@KafkaHandler
public void listen(Hat hat) {
...
}
@KafkaHandler(isDefault = true)
public void delete(Object obj) {
...
}
}