Spring-kafka消费者异常中的序列化异常
Serialization Exception in Spring-kafka Consumer Exception
当向我的 SpringBoot Kafka 应用程序发送任何消息时,我遇到了序列化异常,这是日志。
2021-02-24 01:28:21.280 INFO 19249 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.0
2021-02-24 01:28:21.281 INFO 19249 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 62abe01bee039651
2021-02-24 01:28:21.281 INFO 19249 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1614110301280
2021-02-24 01:28:21.294 INFO 19249 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: paGjVV-5RVyOatWyXOzBrQ
2021-02-24 01:28:21.365 INFO 19249 --- [ntainer#0-0-C-1] c.h.a.m.service.KafkaConsumer : #### -> Consumed message -> "22:26 Hello World!!"
2021-02-24 01:28:21.400 ERROR 19249 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145) ~[spring-kafka-2.6.6.jar:2.6.6]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) ~[spring-kafka-2.6.6.jar:2.6.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1427) ~[spring-kafka-2.6.6.jar:2.6.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124) ~[spring-kafka-2.6.6.jar:2.6.6]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition myTopic-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[34, 50, 50, 58, 50, 54, 32, 72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 33, 33, 32, 32, 74, 97, 105, 32, 83, 104, 114, 105, 32, 82, 97, 109, 33, 33, 32, 66, 97, 114, 97, 109, 98, 97, 97, 114, 32, 74, 97, 105, 32, 72, 111, 33, 33, 34]] from topic [myTopic]
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.hcl.anusheel.messagestream.request.dto.EntryObject` (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('22:26 Hello World!!')
at [Source: (byte[])""22:26 Hello World!!""; line: 1, column: 1]
at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1455) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1081) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.deser.ValueInstantiator._createFromStringFallbacks(ValueInstantiator.java:371) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromString(StdValueInstantiator.java:323) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromString(BeanDeserializerBase.java:1408) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:176) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:166) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2079) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1555) ~[jackson-databind-2.11.4.jar:2.11.4]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:517) ~[spring-kafka-2.6.6.jar:2.6.6]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access00(Fetcher.java:130) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access00(Fetcher.java:1432) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1308) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.6.0.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1271) ~[spring-kafka-2.6.6.jar:2.6.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1162) ~[spring-kafka-2.6.6.jar:2.6.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) ~[spring-kafka-2.6.6.jar:2.6.6]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
2021-02-24 01:28:21.412 ERROR 19249 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
如何解决这个问题?
这是 EntryObject.java class,这是传入请求对象的一部分,将被序列化并提供给 Kafka 主题,同样应该从 Kafka 消费者中检索以进行进一步处理.
public class EntryObject {
@NonNull
private String tradeId;
@NonNull
private int version;
@NonNull
private String counterPartyId;
@NonNull
private String bookId;
@JsonFormat(pattern = "dd/MM/yyyy")
@DateTimeFormat(pattern = "dd/MM/yyyy")
private LocalDate maturityDate;
@JsonFormat(pattern = "dd/MM/yyyy")
@DateTimeFormat(pattern = "dd/MM/yyyy")
private LocalDate createdDate;
private char expired;
public String getTradeId() {
return tradeId;
}
public void setTradeId(String tradeId) {
this.tradeId = tradeId;
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
public String getCounterPartyId() {
return counterPartyId;
}
public void setCounterPartyId(String counterPartyId) {
this.counterPartyId = counterPartyId;
}
public String getBookId() {
return bookId;
}
public void setBookId(String bookId) {
this.bookId = bookId;
}
public LocalDate getMaturityDate() {
return maturityDate;
}
public void setMaturityDate(LocalDate maturityDate) {
this.maturityDate = maturityDate;
}
public LocalDate getCreatedDate() {
return createdDate;
}
public void setCreatedDate(LocalDate createdDate) {
this.createdDate = createdDate;
}
public char getExpired() {
return expired;
}
public void setExpired(char expired) {
this.expired = expired;
}
@Override
public String toString() {
return "EntryObject [tradeId=" + tradeId + ", version=" + version + ", counterPartyId=" + counterPartyId
+ ", bookId=" + bookId + ", maturityDate=" + maturityDate + ", createdDate=" + createdDate
+ ", expired=" + expired + "]";
}
}
这是我的 KafkaConsumerConfig.java class.
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
public ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(groupId));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("foo");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("bar");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> headersKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("headers");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> partitionsKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("partitions");
}
public ConsumerFactory<String, EntryObject> entryObjectConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "entryObject");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(EntryObject.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, EntryObject> entryObjectKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, EntryObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(entryObjectConsumerFactory());
return factory;
}
}
这是我的 KafkaConsumer.java class
@Service
public class KafkaConsumer {
private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = "myTopic", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void receive(String message) throws IOException {
logger.info(String.format("#### -> Consumed message -> %s", message));
}
@KafkaListener(topics = "myTopic", containerFactory = "entryObjectKafkaListenerContainerFactory")
public void receive(EntryObject entryObject) throws IOException {
logger.info("received entryObject = '{}'", entryObject.toString());
}
}
如何解决这个异常?并顺利运行申请。
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of com.hcl.anusheel.messagestream.request.dto.EntryObject
(although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('22:26 Hello World!!')
Json 是 JSON 编码的字符串 "..."
。
Jackson 正在尝试为看起来像这样的入口对象找到一个构造函数...
public EntryObject(String data) { ... }
并没有这样的CTOR。
当向我的 SpringBoot Kafka 应用程序发送任何消息时,我遇到了序列化异常,这是日志。
2021-02-24 01:28:21.280 INFO 19249 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.0
2021-02-24 01:28:21.281 INFO 19249 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 62abe01bee039651
2021-02-24 01:28:21.281 INFO 19249 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1614110301280
2021-02-24 01:28:21.294 INFO 19249 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: paGjVV-5RVyOatWyXOzBrQ
2021-02-24 01:28:21.365 INFO 19249 --- [ntainer#0-0-C-1] c.h.a.m.service.KafkaConsumer : #### -> Consumed message -> "22:26 Hello World!!"
2021-02-24 01:28:21.400 ERROR 19249 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145) ~[spring-kafka-2.6.6.jar:2.6.6]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) ~[spring-kafka-2.6.6.jar:2.6.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1427) ~[spring-kafka-2.6.6.jar:2.6.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124) ~[spring-kafka-2.6.6.jar:2.6.6]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition myTopic-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[34, 50, 50, 58, 50, 54, 32, 72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 33, 33, 32, 32, 74, 97, 105, 32, 83, 104, 114, 105, 32, 82, 97, 109, 33, 33, 32, 66, 97, 114, 97, 109, 98, 97, 97, 114, 32, 74, 97, 105, 32, 72, 111, 33, 33, 34]] from topic [myTopic]
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.hcl.anusheel.messagestream.request.dto.EntryObject` (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('22:26 Hello World!!')
at [Source: (byte[])""22:26 Hello World!!""; line: 1, column: 1]
at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1455) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1081) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.deser.ValueInstantiator._createFromStringFallbacks(ValueInstantiator.java:371) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromString(StdValueInstantiator.java:323) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromString(BeanDeserializerBase.java:1408) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:176) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:166) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2079) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1555) ~[jackson-databind-2.11.4.jar:2.11.4]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:517) ~[spring-kafka-2.6.6.jar:2.6.6]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access00(Fetcher.java:130) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access00(Fetcher.java:1432) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1308) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.6.0.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1271) ~[spring-kafka-2.6.6.jar:2.6.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1162) ~[spring-kafka-2.6.6.jar:2.6.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) ~[spring-kafka-2.6.6.jar:2.6.6]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
2021-02-24 01:28:21.412 ERROR 19249 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
如何解决这个问题?
这是 EntryObject.java class,这是传入请求对象的一部分,将被序列化并提供给 Kafka 主题,同样应该从 Kafka 消费者中检索以进行进一步处理.
public class EntryObject {
@NonNull
private String tradeId;
@NonNull
private int version;
@NonNull
private String counterPartyId;
@NonNull
private String bookId;
@JsonFormat(pattern = "dd/MM/yyyy")
@DateTimeFormat(pattern = "dd/MM/yyyy")
private LocalDate maturityDate;
@JsonFormat(pattern = "dd/MM/yyyy")
@DateTimeFormat(pattern = "dd/MM/yyyy")
private LocalDate createdDate;
private char expired;
public String getTradeId() {
return tradeId;
}
public void setTradeId(String tradeId) {
this.tradeId = tradeId;
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
public String getCounterPartyId() {
return counterPartyId;
}
public void setCounterPartyId(String counterPartyId) {
this.counterPartyId = counterPartyId;
}
public String getBookId() {
return bookId;
}
public void setBookId(String bookId) {
this.bookId = bookId;
}
public LocalDate getMaturityDate() {
return maturityDate;
}
public void setMaturityDate(LocalDate maturityDate) {
this.maturityDate = maturityDate;
}
public LocalDate getCreatedDate() {
return createdDate;
}
public void setCreatedDate(LocalDate createdDate) {
this.createdDate = createdDate;
}
public char getExpired() {
return expired;
}
public void setExpired(char expired) {
this.expired = expired;
}
@Override
public String toString() {
return "EntryObject [tradeId=" + tradeId + ", version=" + version + ", counterPartyId=" + counterPartyId
+ ", bookId=" + bookId + ", maturityDate=" + maturityDate + ", createdDate=" + createdDate
+ ", expired=" + expired + "]";
}
}
这是我的 KafkaConsumerConfig.java class.
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
public ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(groupId));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("foo");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("bar");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> headersKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("headers");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> partitionsKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("partitions");
}
public ConsumerFactory<String, EntryObject> entryObjectConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "entryObject");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(EntryObject.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, EntryObject> entryObjectKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, EntryObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(entryObjectConsumerFactory());
return factory;
}
}
这是我的 KafkaConsumer.java class
@Service
public class KafkaConsumer {
private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = "myTopic", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void receive(String message) throws IOException {
logger.info(String.format("#### -> Consumed message -> %s", message));
}
@KafkaListener(topics = "myTopic", containerFactory = "entryObjectKafkaListenerContainerFactory")
public void receive(EntryObject entryObject) throws IOException {
logger.info("received entryObject = '{}'", entryObject.toString());
}
}
如何解决这个异常?并顺利运行申请。
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of
com.hcl.anusheel.messagestream.request.dto.EntryObject
(although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('22:26 Hello World!!')
Json 是 JSON 编码的字符串 "..."
。
Jackson 正在尝试为看起来像这样的入口对象找到一个构造函数...
public EntryObject(String data) { ... }
并没有这样的CTOR。