spring kafka headers 中没有类型信息,也没有提供默认类型
spring kafka No type information in headers and no default type provided
我有一个 spring 启动应用程序,它定义了:
- 写入 kafka 主题的 REST 控制器,STREAM_TOPIC_IN_QQQ
- 一个 KafkaListener,它从 STREAM_TOPIC_IN_QQQ (groupId="bar") 和日志中读取
- 一个 KStream 可以查看主题并将其记录下来,将其转换为另一种类型,然后将其写入 STREAM_TOPIC_OUT_QQQ
- 另一个从 STREAM_TOPIC_OUT_QQQ 读取的 KafkaListener。
(我一直在更改后缀以避免任何可能的混淆,并手动创建主题,否则我会收到警告,STREAM_TOPIC_IN_xxx =LEADER_NOT_AVAILABLE 并且流不会 运行 一分钟左右。)
第一个侦听器和流似乎在工作,但是当 STREAM_OUT_TOPIC 上的侦听器尝试反序列化消息时,出现以下异常。我在流中使用 Produced.with 提供 serde。我需要做什么才能让侦听器知道要反序列化到的类型?
日志
11 Mar 2019 14:34:00,194 DEBUG [KafkaMessageController [] http-nio-8080-exec-1] Sending a Kafka Message
11 Mar 2019 14:34:00,236 INFO [KafkaConfig [] kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1] -------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: Hello, World!
11 Mar 2019 14:34:00,241 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: ConsumerRecord: {}ConsumerRecord(topic = STREAM_TOPIC_IN_QQQ, partition = 0, offset = 0, CreateTime = 1552332840188, serialized key size = 1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = com.teramedica.kafakaex001web.model.Greeting@7b6c8fcc)
11 Mar 2019 14:34:00,243 INFO [Metadata [] kafka-producer-network-thread | kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1-producer] Cluster ID: y48IEZaGQWKcWDVGf4mD6g
11 Mar 2019 14:34:00,367 ERROR [LoggingErrorHandler [] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] Error while processing: ConsumerRecord(topic = STREAM_TOPIC_OUT_QQQ, partition = 0, offset = 0, CreateTime = 1552332840188, serialized key size = 1, serialized value size = 48, headers = RecordHeaders(headers = [RecordHeader(key = springDeserializerExceptionValue, value = [ REDACTED ])], isReadOnly = false), key = 1, value = null)
org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is java.lang.IllegalStateException: No type information in headers and no default type provided
at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2.deserializationException(ErrorHandlingDeserializer2.java:204) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
配置如下:
REST (spring mvc):
@RequestMapping("/greeting")
public Greeting greeting(@RequestParam(value = "name", defaultValue = "World") String name) {
Greeting gr = new Greeting(counter.incrementAndGet(), String.format(msgTemplate, name));
this.kafkaTemplate.send(K9000Consts.STREAM_TOPIC_IN, "1", gr);
logger.debug("Sending a Kafka Message");
return gr;
}
卡夫卡配置(spring-kafka):
@Bean
public KStream<String, Greeting> kStream(StreamsBuilder kStreamBuilder) {
KStream<String, Greeting> stream = kStreamBuilder.stream(K9000Consts.STREAM_TOPIC_IN);
stream.peek((k, greeting) -> {
logger.info("-------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: {}", greeting.getContent());
})
.map((k, v) -> new KeyValue<>(k, new GreetingResponse(v)))
.to(K9000Consts.STREAM_TOPIC_OUT, Produced.with(stringSerde, new JsonSerde<>(GreetingResponse.class)));
return stream;
}
@KafkaListener(topics = K9000Consts.STREAM_TOPIC_OUT, groupId="oofda", errorHandler = "myTopicErrorHandler")
public void listenForGreetingResponse(ConsumerRecord<String, GreetingResponse> cr) throws Exception {
logger.info("STREAM_OUT_TOPIC Listener : {}" + cr.toString());
}
@KafkaListener(topics = K9000Consts.STREAM_TOPIC_IN, groupId = "bar")
public void listenForGreetingResponses(ConsumerRecord<String, Greeting> cr) throws Exception {
logger.info("STREAM_IN_TOPIC Listener: ConsumerRecord: {}" + cr.toString());
}
application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: foo
auto-offset-reset: latest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
properties:
spring.json.trusted.packages: com.teramedica.kafakaex001web.model
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
streams:
application-id: kafka9000-v0.1
properties: # properties not explicitly handled by KafkaProperties.streams
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
spring.json.trusted.packages: com.teramedica.kafakaex001web.model
具体...
JsonDeserializer.VALUE_DEFAULT_TYPE
: Fallback type for deserialization of values if no header information is present.
是spring.json.value.default.type
您还可以设置 spring.json.use.type.headers
(默认为 true)以防止查找 headers.
反序列化器自动信任默认类型的包,因此没有必要将其添加到那里。
编辑
但是,另见 Spring Messaging Message Conversion。
使用BytesDeserializer
和BytesJsonMessageConverter
,框架会将方法参数类型作为转换目标。
"Answering" 我自己的问题主要是为了整合来自@GaryRussell 的评论中的信息,但基本上,他提供了最佳答案。简而言之,我做了以下事情:
- 将消费者解串器设置为 StringDeserializer
- 添加一个 messageConverter bean 作为 StringJsonMessageConverter
- 在 KafkaListener 注释方法中,只需使用预期的 Payload 类型
- 如果在 KafaListener 注释方法中使用 ConsumerRecord,请不要期望它是 Payload 类型。现在它将是 String(因为消息转换器而不是反序列化器正在执行此操作)。
另一件事:默认情况下,在使用 spring 引导自动配置时,简单地添加 messageConverter 也会将其添加到自动配置的 kafkaTemplate 中。这在调用 kafkaTemplate.send(K9000Consts.STREAM_TOPIC_IN, "1", greeting)
时似乎不是问题,但我认为如果使用 send(Message).
可能会成为问题
下面是一个有效的配置,因为我用最少的配置得到了预期的消息
application.yml:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: foo
auto-offset-reset: latest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
properties:
spring.json.trusted.packages: com.teramedica.kafakaex001web.model
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
streams:
application-id: kafka9000-v0.1
properties: # properties not explicitly handled by KafkaProperties.streams
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
spring.json.trusted.packages: com.teramedica.kafakaex001web.model
卡夫卡配置:
@Bean RecordMessageConverter messageConverter() { return new StringJsonMessageConverter(); }
...
@Bean
public KStream<String, Greeting> kStream(StreamsBuilder kStreamBuilder) {
KStream<String, Greeting> stream = kStreamBuilder.stream(K9000Consts.STREAM_TOPIC_IN);
stream.peek((k, greeting) -> {
logger.info("-------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: {}", greeting.getContent());
})
.map((k, v) -> new KeyValue<>(k, new GreetingResponse(v)))
.to(K9000Consts.STREAM_TOPIC_OUT, Produced.with(stringSerde, new JsonSerde<>(GreetingResponse.class)));
return stream;
}
@KafkaListener(topics = K9000Consts.STREAM_TOPIC_OUT, groupId="oofda", errorHandler = "myTopicErrorHandler")
public void listenForGreetingResponse(GreetingResponse gr) throws Exception {
// logger.info("STREAM_OUT_TOPIC Listener : {}" + cr.toString());
logger.info("STREAM_OUT_TOPIC Listener : GreetingResponse is {}" + gr);
}
@KafkaListener(topics = K9000Consts.STREAM_TOPIC_IN, groupId = "bar")
public void listenForGreetingResponses(@Payload Greeting gr,
ConsumerRecord<String, String> record, // <--- NOTE: String, NOT Greeting
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) throws Exception {
//logger.info("STREAM_IN_TOPIC Listener: ConsumerRecord: {}" + cr.toString());
logger.info("STREAM_IN_TOPIC Listener: Greeting: {}", gr.getContent());
logger.info("STREAM_IN_TOPIC Listener: From Headers: topic: {}, partition: {}, key: {}", topic, partition,
key);
logger.info("STREAM_IN_TOPIC Listener:: From Record: topic: {}, parition: {}, key: {}",
record.topic(), record.partition(), record.key());
logger.info("STREAM_IN_TOPIC Listener:: record value: {}, class: {}", record.value(), record.value().getClass() );
}
@Bean
public KafkaListenerErrorHandler myTopicErrorHandler() {
return (m, e) -> {
logger.error("Got an error {}", e.getMessage());
return "some info about the failure";
};
}
一条消息的输出是:
13 Mar 2019 09:56:57,884 DEBUG [KafkaMessageController [] http-nio-8080-exec-1] Sending a Kafka Message
13 Mar 2019 09:56:57,913 INFO [KafkaConfig [] kafka9000-v0.1-b0589cc5-2fab-4b72-81f7-b0d5488c7478-StreamThread-1] -------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: Hello, World!
13 Mar 2019 09:56:57,919 INFO [Metadata [] kafka-producer-network-thread | kafka9000-v0.1-b0589cc5-2fab-4b72-81f7-b0d5488c7478-StreamThread-1-producer] Cluster ID: 8nREAmTCS0SZT-NzWsCacQ
13 Mar 2019 09:56:57,919 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: Greeting: Hello, World!
13 Mar 2019 09:56:57,920 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: Record: ConsumerRecord(topic = STREAM_TOPIC_IN_SSS, partition = 0, offset = 23, CreateTime = 1552489017878, serialized key size = 1, serialized value size = 34, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 116, 101, 114, 97, 109, 101, 100, 105, 99, 97, 46, 107, 97, 102, 97, 107, 97, 101, 120, 48, 48, 49, 119, 101, 98, 46, 109, 111, 100, 101, 108, 46, 71, 114, 101, 101, 116, 105, 110, 103])], isReadOnly = false), key = 1, value = {"id":1,"content":"Hello, World!"})
13 Mar 2019 09:56:57,920 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: From Headers: topic: STREAM_TOPIC_IN_SSS, partition: 0, key: 1
13 Mar 2019 09:56:57,920 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener:: From Record: topic: STREAM_TOPIC_IN_SSS, parition: 0, key: 1
13 Mar 2019 09:56:57,921 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener:: record value: {"id":1,"content":"Hello, World!"}, class: class java.lang.String
13 Mar 2019 09:56:58,030 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] STREAM_OUT_TOPIC Listener : GreetingResponse id: 1000, response: Hello, World!, yourself
这不是答案;但可能会帮助人们从搜索引擎登陆这里。
如果您在 运行 KafkaStreams 应用程序时遇到此异常。
- 您是否已在 DSL 中的所有必要位置注册 jsonSerde?
- 您是否在 state store instatioation 中提供了 jsonSerde?
注意 1:确保您已按如下所述初始化 jsonSerde:
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
Serializer<JsonNode> jsonSerializer = new JsonSerializer();
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
注2:最常见的错误
import org.springframework.kafka.support.serializer.JsonSerde;
new JsonSerde<JsonNode>(); // This is wrong
我的情况是我已经向 Kafka 主题发布了一些不同类型的消息并收到此异常。
修复它。
我创建了一个新主题并在那里发布了消息。然后就该主题开始使用消费者,效果很好。
所以我也遇到了同样的问题
我是这样改的
您必须将以下 属性 设置为您要反序列化到的 class
spring.json.value.default.type=com.something.model.TransactionEventPayload
我将 KafkaListener 的属性设置为:
@KafkaListener(topics = "topic", groupId = "myGroupId", properties = {"spring.json.value.default.type=com.something.model.TransactionEventPayload"})
public void consumeTransactionEvent(@Payload TransactionEventPayload payload,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {
此异常由 org.springframework.kafka.support.serializer.JsonDeserializer
抛出,它要求类型信息包含在特殊类型 header 中,或通过 spring.json.value.default.type configuration property
提供给 @KafkaListener
。
这就是我在 SpringBoot 2.5.3 中解决这个问题的方法:
- 将
ByteArrayJsonMessageConverter
添加到上下文中:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.converter.ByteArrayJsonMessageConverter;
import org.springframework.kafka.support.converter.JsonMessageConverter;
@Configuration
public class JsonMessageConverterConfig {
@Bean
public JsonMessageConverter jsonMessageConverter() {
return new ByteArrayJsonMessageConverter();
}
}
- 设置
app.kafka.producer.value-serializer
和 app.kafka.consumer.value-deserializer
:
app.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
app.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
- 现在您可以禁用
TypeId header
: 的序列化
spring.kafka.producer.properties.spring.json.add.type.headers=false
我有一个 spring 启动应用程序,它定义了:
- 写入 kafka 主题的 REST 控制器,STREAM_TOPIC_IN_QQQ
- 一个 KafkaListener,它从 STREAM_TOPIC_IN_QQQ (groupId="bar") 和日志中读取
- 一个 KStream 可以查看主题并将其记录下来,将其转换为另一种类型,然后将其写入 STREAM_TOPIC_OUT_QQQ
- 另一个从 STREAM_TOPIC_OUT_QQQ 读取的 KafkaListener。
(我一直在更改后缀以避免任何可能的混淆,并手动创建主题,否则我会收到警告,STREAM_TOPIC_IN_xxx =LEADER_NOT_AVAILABLE 并且流不会 运行 一分钟左右。)
第一个侦听器和流似乎在工作,但是当 STREAM_OUT_TOPIC 上的侦听器尝试反序列化消息时,出现以下异常。我在流中使用 Produced.with 提供 serde。我需要做什么才能让侦听器知道要反序列化到的类型?
日志
11 Mar 2019 14:34:00,194 DEBUG [KafkaMessageController [] http-nio-8080-exec-1] Sending a Kafka Message
11 Mar 2019 14:34:00,236 INFO [KafkaConfig [] kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1] -------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: Hello, World!
11 Mar 2019 14:34:00,241 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: ConsumerRecord: {}ConsumerRecord(topic = STREAM_TOPIC_IN_QQQ, partition = 0, offset = 0, CreateTime = 1552332840188, serialized key size = 1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = com.teramedica.kafakaex001web.model.Greeting@7b6c8fcc)
11 Mar 2019 14:34:00,243 INFO [Metadata [] kafka-producer-network-thread | kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1-producer] Cluster ID: y48IEZaGQWKcWDVGf4mD6g
11 Mar 2019 14:34:00,367 ERROR [LoggingErrorHandler [] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] Error while processing: ConsumerRecord(topic = STREAM_TOPIC_OUT_QQQ, partition = 0, offset = 0, CreateTime = 1552332840188, serialized key size = 1, serialized value size = 48, headers = RecordHeaders(headers = [RecordHeader(key = springDeserializerExceptionValue, value = [ REDACTED ])], isReadOnly = false), key = 1, value = null)
org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is java.lang.IllegalStateException: No type information in headers and no default type provided
at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2.deserializationException(ErrorHandlingDeserializer2.java:204) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
配置如下:
REST (spring mvc):
@RequestMapping("/greeting")
public Greeting greeting(@RequestParam(value = "name", defaultValue = "World") String name) {
Greeting gr = new Greeting(counter.incrementAndGet(), String.format(msgTemplate, name));
this.kafkaTemplate.send(K9000Consts.STREAM_TOPIC_IN, "1", gr);
logger.debug("Sending a Kafka Message");
return gr;
}
卡夫卡配置(spring-kafka):
@Bean
public KStream<String, Greeting> kStream(StreamsBuilder kStreamBuilder) {
KStream<String, Greeting> stream = kStreamBuilder.stream(K9000Consts.STREAM_TOPIC_IN);
stream.peek((k, greeting) -> {
logger.info("-------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: {}", greeting.getContent());
})
.map((k, v) -> new KeyValue<>(k, new GreetingResponse(v)))
.to(K9000Consts.STREAM_TOPIC_OUT, Produced.with(stringSerde, new JsonSerde<>(GreetingResponse.class)));
return stream;
}
@KafkaListener(topics = K9000Consts.STREAM_TOPIC_OUT, groupId="oofda", errorHandler = "myTopicErrorHandler")
public void listenForGreetingResponse(ConsumerRecord<String, GreetingResponse> cr) throws Exception {
logger.info("STREAM_OUT_TOPIC Listener : {}" + cr.toString());
}
@KafkaListener(topics = K9000Consts.STREAM_TOPIC_IN, groupId = "bar")
public void listenForGreetingResponses(ConsumerRecord<String, Greeting> cr) throws Exception {
logger.info("STREAM_IN_TOPIC Listener: ConsumerRecord: {}" + cr.toString());
}
application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: foo
auto-offset-reset: latest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
properties:
spring.json.trusted.packages: com.teramedica.kafakaex001web.model
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
streams:
application-id: kafka9000-v0.1
properties: # properties not explicitly handled by KafkaProperties.streams
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
spring.json.trusted.packages: com.teramedica.kafakaex001web.model
具体...
JsonDeserializer.VALUE_DEFAULT_TYPE
: Fallback type for deserialization of values if no header information is present.
是spring.json.value.default.type
您还可以设置 spring.json.use.type.headers
(默认为 true)以防止查找 headers.
反序列化器自动信任默认类型的包,因此没有必要将其添加到那里。
编辑
但是,另见 Spring Messaging Message Conversion。
使用BytesDeserializer
和BytesJsonMessageConverter
,框架会将方法参数类型作为转换目标。
"Answering" 我自己的问题主要是为了整合来自@GaryRussell 的评论中的信息,但基本上,他提供了最佳答案。简而言之,我做了以下事情:
- 将消费者解串器设置为 StringDeserializer
- 添加一个 messageConverter bean 作为 StringJsonMessageConverter
- 在 KafkaListener 注释方法中,只需使用预期的 Payload 类型
- 如果在 KafaListener 注释方法中使用 ConsumerRecord,请不要期望它是 Payload 类型。现在它将是 String(因为消息转换器而不是反序列化器正在执行此操作)。
另一件事:默认情况下,在使用 spring 引导自动配置时,简单地添加 messageConverter 也会将其添加到自动配置的 kafkaTemplate 中。这在调用 kafkaTemplate.send(K9000Consts.STREAM_TOPIC_IN, "1", greeting)
时似乎不是问题,但我认为如果使用 send(Message).
下面是一个有效的配置,因为我用最少的配置得到了预期的消息
application.yml:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: foo
auto-offset-reset: latest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
properties:
spring.json.trusted.packages: com.teramedica.kafakaex001web.model
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
streams:
application-id: kafka9000-v0.1
properties: # properties not explicitly handled by KafkaProperties.streams
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
spring.json.trusted.packages: com.teramedica.kafakaex001web.model
卡夫卡配置:
@Bean RecordMessageConverter messageConverter() { return new StringJsonMessageConverter(); }
...
@Bean
public KStream<String, Greeting> kStream(StreamsBuilder kStreamBuilder) {
KStream<String, Greeting> stream = kStreamBuilder.stream(K9000Consts.STREAM_TOPIC_IN);
stream.peek((k, greeting) -> {
logger.info("-------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: {}", greeting.getContent());
})
.map((k, v) -> new KeyValue<>(k, new GreetingResponse(v)))
.to(K9000Consts.STREAM_TOPIC_OUT, Produced.with(stringSerde, new JsonSerde<>(GreetingResponse.class)));
return stream;
}
@KafkaListener(topics = K9000Consts.STREAM_TOPIC_OUT, groupId="oofda", errorHandler = "myTopicErrorHandler")
public void listenForGreetingResponse(GreetingResponse gr) throws Exception {
// logger.info("STREAM_OUT_TOPIC Listener : {}" + cr.toString());
logger.info("STREAM_OUT_TOPIC Listener : GreetingResponse is {}" + gr);
}
@KafkaListener(topics = K9000Consts.STREAM_TOPIC_IN, groupId = "bar")
public void listenForGreetingResponses(@Payload Greeting gr,
ConsumerRecord<String, String> record, // <--- NOTE: String, NOT Greeting
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) throws Exception {
//logger.info("STREAM_IN_TOPIC Listener: ConsumerRecord: {}" + cr.toString());
logger.info("STREAM_IN_TOPIC Listener: Greeting: {}", gr.getContent());
logger.info("STREAM_IN_TOPIC Listener: From Headers: topic: {}, partition: {}, key: {}", topic, partition,
key);
logger.info("STREAM_IN_TOPIC Listener:: From Record: topic: {}, parition: {}, key: {}",
record.topic(), record.partition(), record.key());
logger.info("STREAM_IN_TOPIC Listener:: record value: {}, class: {}", record.value(), record.value().getClass() );
}
@Bean
public KafkaListenerErrorHandler myTopicErrorHandler() {
return (m, e) -> {
logger.error("Got an error {}", e.getMessage());
return "some info about the failure";
};
}
一条消息的输出是:
13 Mar 2019 09:56:57,884 DEBUG [KafkaMessageController [] http-nio-8080-exec-1] Sending a Kafka Message
13 Mar 2019 09:56:57,913 INFO [KafkaConfig [] kafka9000-v0.1-b0589cc5-2fab-4b72-81f7-b0d5488c7478-StreamThread-1] -------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: Hello, World!
13 Mar 2019 09:56:57,919 INFO [Metadata [] kafka-producer-network-thread | kafka9000-v0.1-b0589cc5-2fab-4b72-81f7-b0d5488c7478-StreamThread-1-producer] Cluster ID: 8nREAmTCS0SZT-NzWsCacQ
13 Mar 2019 09:56:57,919 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: Greeting: Hello, World!
13 Mar 2019 09:56:57,920 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: Record: ConsumerRecord(topic = STREAM_TOPIC_IN_SSS, partition = 0, offset = 23, CreateTime = 1552489017878, serialized key size = 1, serialized value size = 34, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 116, 101, 114, 97, 109, 101, 100, 105, 99, 97, 46, 107, 97, 102, 97, 107, 97, 101, 120, 48, 48, 49, 119, 101, 98, 46, 109, 111, 100, 101, 108, 46, 71, 114, 101, 101, 116, 105, 110, 103])], isReadOnly = false), key = 1, value = {"id":1,"content":"Hello, World!"})
13 Mar 2019 09:56:57,920 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: From Headers: topic: STREAM_TOPIC_IN_SSS, partition: 0, key: 1
13 Mar 2019 09:56:57,920 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener:: From Record: topic: STREAM_TOPIC_IN_SSS, parition: 0, key: 1
13 Mar 2019 09:56:57,921 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener:: record value: {"id":1,"content":"Hello, World!"}, class: class java.lang.String
13 Mar 2019 09:56:58,030 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] STREAM_OUT_TOPIC Listener : GreetingResponse id: 1000, response: Hello, World!, yourself
这不是答案;但可能会帮助人们从搜索引擎登陆这里。
如果您在 运行 KafkaStreams 应用程序时遇到此异常。
- 您是否已在 DSL 中的所有必要位置注册 jsonSerde?
- 您是否在 state store instatioation 中提供了 jsonSerde?
注意 1:确保您已按如下所述初始化 jsonSerde:
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
Serializer<JsonNode> jsonSerializer = new JsonSerializer();
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
注2:最常见的错误
import org.springframework.kafka.support.serializer.JsonSerde;
new JsonSerde<JsonNode>(); // This is wrong
我的情况是我已经向 Kafka 主题发布了一些不同类型的消息并收到此异常。
修复它。
我创建了一个新主题并在那里发布了消息。然后就该主题开始使用消费者,效果很好。
所以我也遇到了同样的问题
我是这样改的
您必须将以下 属性 设置为您要反序列化到的 class
spring.json.value.default.type=com.something.model.TransactionEventPayload
我将 KafkaListener 的属性设置为:
@KafkaListener(topics = "topic", groupId = "myGroupId", properties = {"spring.json.value.default.type=com.something.model.TransactionEventPayload"})
public void consumeTransactionEvent(@Payload TransactionEventPayload payload,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {
此异常由 org.springframework.kafka.support.serializer.JsonDeserializer
抛出,它要求类型信息包含在特殊类型 header 中,或通过 spring.json.value.default.type configuration property
提供给 @KafkaListener
。
这就是我在 SpringBoot 2.5.3 中解决这个问题的方法:
- 将
ByteArrayJsonMessageConverter
添加到上下文中:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.converter.ByteArrayJsonMessageConverter;
import org.springframework.kafka.support.converter.JsonMessageConverter;
@Configuration
public class JsonMessageConverterConfig {
@Bean
public JsonMessageConverter jsonMessageConverter() {
return new ByteArrayJsonMessageConverter();
}
}
- 设置
app.kafka.producer.value-serializer
和app.kafka.consumer.value-deserializer
:
app.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
app.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
- 现在您可以禁用
TypeId header
: 的序列化
spring.kafka.producer.properties.spring.json.add.type.headers=false