卡夫卡消费者错误
Kafka Consumer Error
我正在使用 kafka 生产者和 Spring kafka 消费者。我正在使用 Json 序列化器和反序列化器。每当我尝试从主题中读取消费者中的消息时,我都会收到以下错误:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition fan_topic-0 at offset 154. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
我没有在生产者和消费者中配置任何关于 headers 的东西。我在这里错过了什么?
我相信您忽略了这样一个事实,即必须在 ConsumerFactory
上配置 JsonDeserializer
并使用适当的默认类型进行反序列化,而不是在 Kafka 属性中。
文档中提供了所有信息:https://docs.spring.io/spring-kafka/docs/2.1.7.RELEASE/reference/html/_reference.html#serdes
只是添加到上面的答案,
以下更改为我解决了。
config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
添加
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(String.class));
而不是
return new DefaultKafkaConsumerFactory<String, String>(config);
供参考,
deserialize
中的以下方法期望 headers 和“Assert.state..
”抛出 IllegalStateException
@Override
public T deserialize(String topic, Headers headers, byte[] data) {
JavaType javaType = this.typeMapper.toJavaType(headers);
if (javaType == null) {
Assert.state(this.targetType != null, "No type information in headers and no default type provided");
return deserialize(topic, data);
}
else {
try {
return this.objectMapper.readerFor(javaType).readValue(data);
}
catch (IOException e) {
throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) +
"] from topic [" + topic + "]", e);
}
}
}
我正在使用 kafka 生产者和 Spring kafka 消费者。我正在使用 Json 序列化器和反序列化器。每当我尝试从主题中读取消费者中的消息时,我都会收到以下错误:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition fan_topic-0 at offset 154. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
我没有在生产者和消费者中配置任何关于 headers 的东西。我在这里错过了什么?
我相信您忽略了这样一个事实,即必须在 ConsumerFactory
上配置 JsonDeserializer
并使用适当的默认类型进行反序列化,而不是在 Kafka 属性中。
文档中提供了所有信息:https://docs.spring.io/spring-kafka/docs/2.1.7.RELEASE/reference/html/_reference.html#serdes
只是添加到上面的答案,
以下更改为我解决了。
config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
添加
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(String.class));
而不是
return new DefaultKafkaConsumerFactory<String, String>(config);
供参考,
deserialize
中的以下方法期望 headers 和“Assert.state..
”抛出 IllegalStateException
@Override
public T deserialize(String topic, Headers headers, byte[] data) {
JavaType javaType = this.typeMapper.toJavaType(headers);
if (javaType == null) {
Assert.state(this.targetType != null, "No type information in headers and no default type provided");
return deserialize(topic, data);
}
else {
try {
return this.objectMapper.readerFor(javaType).readValue(data);
}
catch (IOException e) {
throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) +
"] from topic [" + topic + "]", e);
}
}
}