使用 Spring Cloud Stream 的 Kafka Streams 进程中的 Serde 错误
Serde error in Kafka Streams process with Spring Cloud Stream
我正在尝试使用 Spring Cloud Stream 3.0.3.RELEASE 处理一些 Kafka 记录,但我遇到了 Serdes 配置的问题,一旦记录进入流就会出错管道。
这是堆栈跟踪:
30-03-2020 19:28:33 ERROR org.apache.kafka.streams.KafkaStreams [application-local,,,]: stream-client [joinPriorityData-applicationId-1302980a-a016-4167-9c0b-750ffb5d107a] All stream threads have died. The instance will be in error state and should be closed.
Exception in thread "joinPriorityData-applicationId-1302980a-a016-4167-9c0b-750ffb5d107a-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=moaii.security.pe.incidence.queue, partition=0, offset=18108, stacktrace=org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.vcp.moaii.cep.dto.IncidenceState). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:116)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:363)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:425)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:912)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
Caused by: java.lang.ClassCastException: class com.vcp.moaii.cep.dto.IncidenceState cannot be cast to class [B (com.vcp.moaii.cep.dto.IncidenceState is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:163)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:103)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 35 more
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:380)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:425)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:912)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.vcp.moaii.cep.dto.IncidenceState). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:116)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:363)
... 5 more
Caused by: java.lang.ClassCastException: class com.vcp.moaii.cep.dto.IncidenceState cannot be cast to class [B (com.vcp.moaii.cep.dto.IncidenceState is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:163)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:103)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 35 more
这是我的函数:
@Bean
public Function<KTable<String, IncidenceItem>, KStream<String, ?>> joinPriorityData() {
return incidenceStream -> incidenceStream
.toStream()
.filter((key, value) -> filterZoneTypeAndPriority(value))
.selectKey((key, value) -> value.getsInc())
.mapValues((readOnlyKey, value) -> stateMapper.toIncidendeState(value, null));
}
这是我的 application.yml:
spring.json.value.default.type: RawAccounting
spring.cloud.stream:
function.definition: joinPriorityData
bindings:
joinPriorityData-in-0:
destination: moaii.security.pe.incidence.queue
consumer.valueSerde: IncidenceItemSerde
joinPriorityData-out-0:
destination: moaii.security.pe.incidence.state
producer.valueSerde: IncidenceItemSerde
kafka:
binder:
configuration:
auto.commit.interval.ms: 100
auto.offset.reset: latest
streams:
binder:
applicationId: moaii-cep
content-type: application/json
configuration:
default:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.springframework.kafka.support.serializer.JsonSerde
useNativeDecoding: true
我在很多地方尝试过很多不同的配置,但似乎都行得通。正如您在堆栈跟踪中看到的,默认值 Serdes 被忽略,而是使用 ByteArray。
我也创建了一些自定义 serdes,将它们声明为 bean 并在配置文件中使用,如图所示,但结果相同。
无论如何,我觉得其他配置也被忽略了,例如,我可以看到 Ktable 如何跳过一些带有空键的旧记录,然后在读取第一个非空记录时失败,即使我有 auto.offset.reset: latest
不能说是 Kafka 问题还是 Spring 问题,但无法设法修复它
编辑:
在此日志中,您可以看到活页夹如何为入站而不是出站获取正确的 serde:
31-03-2020 11:54:24 INFO o.s.c.s.b.k.streams.KafkaStreamsFunctionProcessor [application-local,,,]: Key Serde used for joinPriorityData-in-0: org.apache.kafka.common.serialization.Serdes$StringSerde
31-03-2020 11:54:24 INFO o.s.c.s.b.k.streams.KafkaStreamsFunctionProcessor [application-local,,,]: Value Serde used for joinPriorityData-in-0: com.vcp.moaii.cep.broker.serde.MoaiiSerdes$IncidenceItemSerde
31-03-2020 11:54:27 INFO o.s.c.stream.binder.kafka.streams.KStreamBinder [application-local,,,]: Key Serde used for (outbound) moaii.security.pe.incidence.state: org.apache.kafka.common.serialization.Serdes$StringSerde
31-03-2020 11:54:27 INFO o.s.c.stream.binder.kafka.streams.KStreamBinder [application-local,,,]: Value Serde used for (outbound) moaii.security.pe.incidence.state: org.apache.kafka.common.serialization.Serdes$ByteArraySerde
如您所见,在出站中甚至没有提及绑定 "joinPriorityData-out-0"
IncidenceState 使用两个不同的 class 加载器加载了两次。它用应用程序(系统 class 加载器)加载一次,用 bootstrap class 加载器加载一次。
这是一个 Kafka 问题。检查这个
https://discuss.kotlinlang.org/t/classloading-error-with-kafka-streams/4547
而这个 https://youtrack.jetbrains.com/issue/KT-24966
有解决方法。
请问问题是否与您的配置和功能方法有关。尝试以下功能和配置,看看是否有所不同。
@Bean
public Function<KTable<String, IncidenceItem>, KStream<String, IncidenceState>>
//same as your original code
}
请注意,我为出站添加了参数化类型 KStream
。这是活页夹正确推断要使用的 Serde
类型所必需的。假设 IncidenceItem
和 IncidenceState
都是 JSON
友好对象,您可以不提供任何默认的 serdes。但是,如果您的内部逻辑需要依赖那些 Serdes,您仍然需要提供它们。以下是修改后的配置。我删除了不必要的属性或重新排列它们。
spring.json.value.default.type: RawAccounting
spring.cloud.stream:
function.definition: joinPriorityData
bindings:
joinPriorityData-in-0:
destination: moaii.security.pe.incidence.queue
joinPriorityData-out-0:
destination: moaii.security.pe.incidence.state
kafka:
streams:
binder:
applicationId: moaii-cep
configuration:
auto.commit.interval.ms: 100
auto.offset.reset: latest
default:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.springframework.kafka.support.serializer.JsonSerde
我不确定您是否同时需要 Kafka 和 Kafka Streams 绑定器。从您共享的设置来看,它看起来不是那样。因此,将所有配置移到Kafka Streams binder配置下。
查看这些更改是否有任何不同。
我正在尝试使用 Spring Cloud Stream 3.0.3.RELEASE 处理一些 Kafka 记录,但我遇到了 Serdes 配置的问题,一旦记录进入流就会出错管道。
这是堆栈跟踪:
30-03-2020 19:28:33 ERROR org.apache.kafka.streams.KafkaStreams [application-local,,,]: stream-client [joinPriorityData-applicationId-1302980a-a016-4167-9c0b-750ffb5d107a] All stream threads have died. The instance will be in error state and should be closed.
Exception in thread "joinPriorityData-applicationId-1302980a-a016-4167-9c0b-750ffb5d107a-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=moaii.security.pe.incidence.queue, partition=0, offset=18108, stacktrace=org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.vcp.moaii.cep.dto.IncidenceState). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:116)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:363)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:425)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:912)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
Caused by: java.lang.ClassCastException: class com.vcp.moaii.cep.dto.IncidenceState cannot be cast to class [B (com.vcp.moaii.cep.dto.IncidenceState is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:163)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:103)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 35 more
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:380)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:425)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:912)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.vcp.moaii.cep.dto.IncidenceState). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:116)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:363)
... 5 more
Caused by: java.lang.ClassCastException: class com.vcp.moaii.cep.dto.IncidenceState cannot be cast to class [B (com.vcp.moaii.cep.dto.IncidenceState is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:163)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:103)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 35 more
这是我的函数:
@Bean
public Function<KTable<String, IncidenceItem>, KStream<String, ?>> joinPriorityData() {
return incidenceStream -> incidenceStream
.toStream()
.filter((key, value) -> filterZoneTypeAndPriority(value))
.selectKey((key, value) -> value.getsInc())
.mapValues((readOnlyKey, value) -> stateMapper.toIncidendeState(value, null));
}
这是我的 application.yml:
spring.json.value.default.type: RawAccounting
spring.cloud.stream:
function.definition: joinPriorityData
bindings:
joinPriorityData-in-0:
destination: moaii.security.pe.incidence.queue
consumer.valueSerde: IncidenceItemSerde
joinPriorityData-out-0:
destination: moaii.security.pe.incidence.state
producer.valueSerde: IncidenceItemSerde
kafka:
binder:
configuration:
auto.commit.interval.ms: 100
auto.offset.reset: latest
streams:
binder:
applicationId: moaii-cep
content-type: application/json
configuration:
default:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.springframework.kafka.support.serializer.JsonSerde
useNativeDecoding: true
我在很多地方尝试过很多不同的配置,但似乎都行得通。正如您在堆栈跟踪中看到的,默认值 Serdes 被忽略,而是使用 ByteArray。
我也创建了一些自定义 serdes,将它们声明为 bean 并在配置文件中使用,如图所示,但结果相同。
无论如何,我觉得其他配置也被忽略了,例如,我可以看到 Ktable 如何跳过一些带有空键的旧记录,然后在读取第一个非空记录时失败,即使我有 auto.offset.reset: latest
不能说是 Kafka 问题还是 Spring 问题,但无法设法修复它
编辑: 在此日志中,您可以看到活页夹如何为入站而不是出站获取正确的 serde:
31-03-2020 11:54:24 INFO o.s.c.s.b.k.streams.KafkaStreamsFunctionProcessor [application-local,,,]: Key Serde used for joinPriorityData-in-0: org.apache.kafka.common.serialization.Serdes$StringSerde
31-03-2020 11:54:24 INFO o.s.c.s.b.k.streams.KafkaStreamsFunctionProcessor [application-local,,,]: Value Serde used for joinPriorityData-in-0: com.vcp.moaii.cep.broker.serde.MoaiiSerdes$IncidenceItemSerde
31-03-2020 11:54:27 INFO o.s.c.stream.binder.kafka.streams.KStreamBinder [application-local,,,]: Key Serde used for (outbound) moaii.security.pe.incidence.state: org.apache.kafka.common.serialization.Serdes$StringSerde
31-03-2020 11:54:27 INFO o.s.c.stream.binder.kafka.streams.KStreamBinder [application-local,,,]: Value Serde used for (outbound) moaii.security.pe.incidence.state: org.apache.kafka.common.serialization.Serdes$ByteArraySerde
如您所见,在出站中甚至没有提及绑定 "joinPriorityData-out-0"
IncidenceState 使用两个不同的 class 加载器加载了两次。它用应用程序(系统 class 加载器)加载一次,用 bootstrap class 加载器加载一次。
这是一个 Kafka 问题。检查这个 https://discuss.kotlinlang.org/t/classloading-error-with-kafka-streams/4547 而这个 https://youtrack.jetbrains.com/issue/KT-24966
有解决方法。
请问问题是否与您的配置和功能方法有关。尝试以下功能和配置,看看是否有所不同。
@Bean
public Function<KTable<String, IncidenceItem>, KStream<String, IncidenceState>>
//same as your original code
}
请注意,我为出站添加了参数化类型 KStream
。这是活页夹正确推断要使用的 Serde
类型所必需的。假设 IncidenceItem
和 IncidenceState
都是 JSON
友好对象,您可以不提供任何默认的 serdes。但是,如果您的内部逻辑需要依赖那些 Serdes,您仍然需要提供它们。以下是修改后的配置。我删除了不必要的属性或重新排列它们。
spring.json.value.default.type: RawAccounting
spring.cloud.stream:
function.definition: joinPriorityData
bindings:
joinPriorityData-in-0:
destination: moaii.security.pe.incidence.queue
joinPriorityData-out-0:
destination: moaii.security.pe.incidence.state
kafka:
streams:
binder:
applicationId: moaii-cep
configuration:
auto.commit.interval.ms: 100
auto.offset.reset: latest
default:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.springframework.kafka.support.serializer.JsonSerde
我不确定您是否同时需要 Kafka 和 Kafka Streams 绑定器。从您共享的设置来看,它看起来不是那样。因此,将所有配置移到Kafka Streams binder配置下。
查看这些更改是否有任何不同。