Spring Cloud Kafka Stream:无法让 WindowedAggregateSessions 使用自定义 Serdes
Spring Cloud Kafka Stream: Can't get WindowedAggregateSessions working with custom Serdes
我对使用 kafka 流和 spring 云流还比较陌生,我在使用那里的窗口聚合功能时遇到了困难。
我想做的是
- 获取我的初始 UserInteractionEvent 流并按 userProjectId(字符串)对它们进行分组
- 为这些事件创建一个有 15 分钟不活动的窗口会话
- 将这些窗口会话聚合到自定义会话对象中
- 然后将这些 Session 对象转换为另一个自定义的 UserSession 对象
我的代码是这样的:
@EnableBinding(KafkaStreamsProcessor::class)
inner class SessionProcessorApplication {
@StreamListener("input")
@SendTo("output")
fun process(input: KStream<*, UserInteractionEvent>): KStream<*, UserSession> {
return input
.groupBy({ _, v -> v.userProjectId }, Serialized.with(Serdes.String(), UserInteractionEventSerde()))
.windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(15)))
.aggregate(
Initializer<Session>(::Session),
Aggregator<String, UserInteractionEvent, Session> { _, event, session -> session.interactions + event.interaction; session },
Merger<String, Session> { _, session1, session2 -> Session.merge(session1, session2)},
Materialized.`as`<String, Session, SessionStore<Bytes, ByteArray>>("windowed-sessions")
.withKeySerde(Serdes.String()).withValueSerde(SessionSerde()))
.toStream()
.map { windowed, session ->
KeyValue(windowed.key(),
UserSession(windowed.key(),
session.interactions,
Instant.ofEpochSecond(windowed.window().start()),
Instant.ofEpochSecond(windowed.window().end()))
)
}
}
}
我似乎在聚合部分遇到了问题。
尝试刷新窗口会话存储时看到 class 强制转换异常。
我很困惑如何从这里开始。
如果有人可以指出我出错的地方或一些涉及使用会话 windows 和自定义 serdes 的文档,我将不胜感激!
非常感谢!
下面的完整堆栈跟踪:
Exception in thread "default-dc0af3aa-8d8d-4b51-b0de-cdeb2dd83db6-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store windowed-sessions
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:245)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:327)
at org.apache.kafka.streams.processor.internals.StreamTask.run(StreamTask.java:307)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:302)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:292)
at org.apache.kafka.streams.processor.internals.AssignedTasks.apply(AssignedTasks.java:87)
at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:452)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:381)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:310)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: [B). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:91)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
at org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:176)
at org.apache.kafka.streams.state.internals.CachingSessionStore.access[=13=]0(CachingSessionStore.java:38)
at org.apache.kafka.streams.state.internals.CachingSessionStore.apply(CachingSessionStore.java:88)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127)
at org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196)
at org.apache.kafka.streams.state.internals.MeteredSessionStore.flush(MeteredSessionStore.java:165)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
... 14 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to [B
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:90)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:78)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
... 45 more
我的配置:
spring.cloud.stream.kafka.streams.bindings:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
input:
consumer:
valueSerde: com.teckro.analytics.UserInteractionEventSerde
output:
producer:
valueSerde: com.teckro.analytics.UserSessionSerde
spring.cloud.stream.bindings:
input:
destination: test-interaction
consumer:
headerMode: raw
output:
destination: test-session
producer:
headerMode: raw
我发现您的配置存在一些问题。
默认 Serde
的配置方式应更改如下:
spring.cloud.stream.kafka.streams.binder.configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings:
input:
consumer:
valueSerde: com.teckro.analytics.UserInteractionEventSerde
output:
producer:
valueSerde: com.teckro.analytics.UserSessionSerde
看来您正在为所有 de/serialization 使用本机 Serde。您想将其包含在配置中。默认情况下,活页夹执行 input/output 序列化。
spring.cloud.stream.bindings:
input:
destination: test-interaction
consumer:
useNativeDecoding: true
output:
destination: test-session
producer:
useNativeEncoding: true
如果问题仍然存在,请在 Github 上创建一个简单的示例项目并与我们分享。我们来看看。
我对使用 kafka 流和 spring 云流还比较陌生,我在使用那里的窗口聚合功能时遇到了困难。
我想做的是
- 获取我的初始 UserInteractionEvent 流并按 userProjectId(字符串)对它们进行分组
- 为这些事件创建一个有 15 分钟不活动的窗口会话
- 将这些窗口会话聚合到自定义会话对象中
- 然后将这些 Session 对象转换为另一个自定义的 UserSession 对象
我的代码是这样的:
@EnableBinding(KafkaStreamsProcessor::class)
inner class SessionProcessorApplication {
@StreamListener("input")
@SendTo("output")
fun process(input: KStream<*, UserInteractionEvent>): KStream<*, UserSession> {
return input
.groupBy({ _, v -> v.userProjectId }, Serialized.with(Serdes.String(), UserInteractionEventSerde()))
.windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(15)))
.aggregate(
Initializer<Session>(::Session),
Aggregator<String, UserInteractionEvent, Session> { _, event, session -> session.interactions + event.interaction; session },
Merger<String, Session> { _, session1, session2 -> Session.merge(session1, session2)},
Materialized.`as`<String, Session, SessionStore<Bytes, ByteArray>>("windowed-sessions")
.withKeySerde(Serdes.String()).withValueSerde(SessionSerde()))
.toStream()
.map { windowed, session ->
KeyValue(windowed.key(),
UserSession(windowed.key(),
session.interactions,
Instant.ofEpochSecond(windowed.window().start()),
Instant.ofEpochSecond(windowed.window().end()))
)
}
}
}
我似乎在聚合部分遇到了问题。 尝试刷新窗口会话存储时看到 class 强制转换异常。 我很困惑如何从这里开始。 如果有人可以指出我出错的地方或一些涉及使用会话 windows 和自定义 serdes 的文档,我将不胜感激!
非常感谢!
下面的完整堆栈跟踪:
Exception in thread "default-dc0af3aa-8d8d-4b51-b0de-cdeb2dd83db6-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store windowed-sessions at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:245) at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196) at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:327) at org.apache.kafka.streams.processor.internals.StreamTask.run(StreamTask.java:307) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:302) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:292) at org.apache.kafka.streams.processor.internals.AssignedTasks.apply(AssignedTasks.java:87) at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:452) at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:381) at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:310) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: [B). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters. at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:91) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42) at org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:176) at org.apache.kafka.streams.state.internals.CachingSessionStore.access[=13=]0(CachingSessionStore.java:38) at org.apache.kafka.streams.state.internals.CachingSessionStore.apply(CachingSessionStore.java:88) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127) at org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196) at org.apache.kafka.streams.state.internals.MeteredSessionStore.flush(MeteredSessionStore.java:165) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242) ... 14 more Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to [B at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:90) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:78) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87) ... 45 more
我的配置:
spring.cloud.stream.kafka.streams.bindings:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
input:
consumer:
valueSerde: com.teckro.analytics.UserInteractionEventSerde
output:
producer:
valueSerde: com.teckro.analytics.UserSessionSerde
spring.cloud.stream.bindings:
input:
destination: test-interaction
consumer:
headerMode: raw
output:
destination: test-session
producer:
headerMode: raw
我发现您的配置存在一些问题。
默认 Serde
的配置方式应更改如下:
spring.cloud.stream.kafka.streams.binder.configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings:
input:
consumer:
valueSerde: com.teckro.analytics.UserInteractionEventSerde
output:
producer:
valueSerde: com.teckro.analytics.UserSessionSerde
看来您正在为所有 de/serialization 使用本机 Serde。您想将其包含在配置中。默认情况下,活页夹执行 input/output 序列化。
spring.cloud.stream.bindings:
input:
destination: test-interaction
consumer:
useNativeDecoding: true
output:
destination: test-session
producer:
useNativeEncoding: true
如果问题仍然存在,请在 Github 上创建一个简单的示例项目并与我们分享。我们来看看。