Spring 云 Kafka 流反序列化问题由以下原因引起:[=10th=]:类名无法转换为类名
Spring cloud Kafka streams Deserialiation issue wtih Caused by: java.lang.ClassCastException: ClassName cannot be cast to ClassName
我正在尝试使用 spring 云 Kafka 流绑定器来使用主题中的 Avro 消息,但无法修复此 classCast 异常。
这是我的代码:
@Bean
public Consumer<KStream<EventKey, Event>> process(){
return input -> {
input.peek(((key, value) -> logger.info("key value: "+ key.toString()+" value: "+value.toString())));
logger.info("Received:" + input);
};
}
@Bean
public Serde<EventKey> avroInSerde(){
final SpecificAvroSerde<EventKey> avroInSerde = new SpecificAvroSerde<>();
Map<String, Object> serdeProperties = new HashMap<>();
return avroInSerde;
}
@Bean
public Serde<Event> avroOutSerde(){
final SpecificAvroSerde<Event> avroOutSerde = new SpecificAvroSerde<>();
return avroOutSerde;
}
活页夹:
spring:
application:
name: ${applicaton-name}
cloud:
stream:
function:
definition: process
bindings:
process-in-0:
destination: ${input-topic-name}
contentType: application/Avro
process-out-0:
destination: ${enriched-topic-name}
contentType: application/Avro
binding-retry-interval: 30
kafka:
streams:
binder:
brokers: ${kafka-broker}
application-id: ${consumer-group-name}
auto-create-topics: false
auto-add-partitions: false
configuration:
processing.guarantee: at_least_once
auto.offset.reset: earliest
schema.registry.url: ${kafka-schema-registry}
auto-register-schema: false
security.protocol: SSL
useNativeEncoding: true
specific.avro.reader: true
错误:
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process(StreamTask.java:679)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
Caused by: java.lang.ClassCastException: EventKey cannot be cast to EventKey
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
我尝试了此 link https://spring.io/blog/2019/12/04/stream-processing-with-spring-cloud-stream-and-apache-kafka-streams-part-3-data-deserialization-and-serialization 中提到的两种方法,但没有成功
我错过了什么吗?
Caused by: java.lang.ClassCastException: EventKey cannot be cast to EventKey
这可能是 class 加载器问题;反序列化器和消费者 bean 由不同的 class 加载器加载,您使用的是 Spring DevTools 吗?
使用 spring-kafka,可以通过显式创建消费者工厂并将反序列化器注入其中来避免这种情况。
使用 spring-cloud-stream(从版本 3.0.6 开始),您可以提供一个 ClientFactoryCustomizer
bean 并注入反序列化器实例(定义为 @Bean
s,以便他们使用相同的 class 装载机)。
或者,停止使用 DevTools。
我正在尝试使用 spring 云 Kafka 流绑定器来使用主题中的 Avro 消息,但无法修复此 classCast 异常。
这是我的代码:
@Bean
public Consumer<KStream<EventKey, Event>> process(){
return input -> {
input.peek(((key, value) -> logger.info("key value: "+ key.toString()+" value: "+value.toString())));
logger.info("Received:" + input);
};
}
@Bean
public Serde<EventKey> avroInSerde(){
final SpecificAvroSerde<EventKey> avroInSerde = new SpecificAvroSerde<>();
Map<String, Object> serdeProperties = new HashMap<>();
return avroInSerde;
}
@Bean
public Serde<Event> avroOutSerde(){
final SpecificAvroSerde<Event> avroOutSerde = new SpecificAvroSerde<>();
return avroOutSerde;
}
活页夹:
spring:
application:
name: ${applicaton-name}
cloud:
stream:
function:
definition: process
bindings:
process-in-0:
destination: ${input-topic-name}
contentType: application/Avro
process-out-0:
destination: ${enriched-topic-name}
contentType: application/Avro
binding-retry-interval: 30
kafka:
streams:
binder:
brokers: ${kafka-broker}
application-id: ${consumer-group-name}
auto-create-topics: false
auto-add-partitions: false
configuration:
processing.guarantee: at_least_once
auto.offset.reset: earliest
schema.registry.url: ${kafka-schema-registry}
auto-register-schema: false
security.protocol: SSL
useNativeEncoding: true
specific.avro.reader: true
错误:
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process(StreamTask.java:679)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
Caused by: java.lang.ClassCastException: EventKey cannot be cast to EventKey
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
我尝试了此 link https://spring.io/blog/2019/12/04/stream-processing-with-spring-cloud-stream-and-apache-kafka-streams-part-3-data-deserialization-and-serialization 中提到的两种方法,但没有成功
我错过了什么吗?
Caused by: java.lang.ClassCastException: EventKey cannot be cast to EventKey
这可能是 class 加载器问题;反序列化器和消费者 bean 由不同的 class 加载器加载,您使用的是 Spring DevTools 吗?
使用 spring-kafka,可以通过显式创建消费者工厂并将反序列化器注入其中来避免这种情况。
使用 spring-cloud-stream(从版本 3.0.6 开始),您可以提供一个 ClientFactoryCustomizer
bean 并注入反序列化器实例(定义为 @Bean
s,以便他们使用相同的 class 装载机)。
或者,停止使用 DevTools。