此错误处理程序无法直接处理“SerializationException”;请考虑配置 'ErrorHandlingDeserializer'

This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer'

生产者属性

spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

消费者属性

spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.group-id=user-group
server.port=8085

消费者服务

@Service
public class UserConsumerService {

    @KafkaListener(topics = { "user-topic" })
    public void consumerUserData(User user) {
        System.out.println("Users Age Is: " + user.getAge() + " Fav Genre " + user.getFavGenre());
    }
}

生产者服务

@Service
public class UserProducerService {

    @Autowired
    private KafkaTemplate<String, User> kafkaTemplate;

    public void sendUserData(User user) {
        kafkaTemplate.send("user-topic", user.getName(), user);
    }
}

创建主题的Producer Config

    @Configuration public class KafkaConfig {
    
        @Bean
        public NewTopic topicOrder() {
            return TopicBuilder.name("user-topic").partitions(2).replicas(1).build();
        } 
}

生产者运行良好,但消费者给出类似

的错误
2021-12-06 21:45:50.299 ERROR 4936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an

'ErrorHandlingDeserializer' in the value and/or key deserializer at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:149) ~[spring-kafka-2.8.0.jar:2.8.0] DefaultErrorHandler.java:149 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1760) ~[spring-kafka-2.8.0.jar:2.8.0] KafkaMessageListenerContainer.java:1760 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1283) ~[spring-kafka-2.8.0.jar:2.8.0] KafkaMessageListenerContainer.java:1283 at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na] Executors.java:539 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na] FutureTask.java:264 at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na] Thread.java:833 Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition user-topic-0 at offset 1. If needed, please seek past the record to continue consumption. at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429) ~[kafka-clients-3.0.0.jar:na] Fetcher.java:1429 at org.apache.kafka.clients.consumer.internals.Fetcher.access00(Fetcher.java:134) ~[kafka-clients-3.0.0.jar:na] Fetcher.java:134 at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652) ~[kafka-clients-3.0.0.jar:na] Fetcher.java:1652 at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access00(Fetcher.java:1488) ~[kafka-clients-3.0.0.jar:na] Fetcher.java:1488 at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:721) ~[kafka-clients-3.0.0.jar:na] Fetcher.java:721 at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:672) ~[kafka-clients-3.0.0.jar:na] Fetcher.java:672 at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1277) ~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1277 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) ~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1238 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1211 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1507) ~[spring-kafka-2.8.0.jar:2.8.0] KafkaMessageListenerContainer.java:1507 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1497) ~[spring-kafka-2.8.0.jar:2.8.0] KafkaMessageListenerContainer.java:1497 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1325) ~[spring-kafka-2.8.0.jar:2.8.0] KafkaMessage

如果你能提供帮助,我会很高兴,因为我是 kafka 的新手,正在尝试找出为什么会出现此错误

错误消息没有告诉您任何信息吗?

This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer

查看文档:https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer

When a deserializer fails to deserialize a message, Spring has no way to handle the problem, because it occurs before the poll() returns. To solve this problem, the ErrorHandlingDeserializer has been introduced. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer returns a null value and a DeserializationException in a header that contains the cause and the raw bytes. When you use a record-level MessageListener, if the ConsumerRecord contains a DeserializationException header for either the key or value, the container’s ErrorHandler is called with the failed ConsumerRecord. The record is not passed to the listener.

You can use the DefaultKafkaConsumerFactory constructor that takes key and value Deserializer objects and wire in appropriate ErrorHandlingDeserializer instances that you have configured with the proper delegates. Alternatively, you can use consumer configuration properties (which are used by the ErrorHandlingDeserializer) to instantiate the delegates. The property names are ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS and ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS. The property value can be a class or class name. The following example shows how to set these properties:

.. // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);

带引导:

...
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
...