如何将 Java api KStream 转换为 Scala api KStream?
How to convert the Java api KStream to Scala api KStream?
我们的库有使用典型 kafka-stream 的数据处理器(Scala 代码):
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
移动到 kafka-stream-scala
的原因是每当我使用 groupBy、selectByKey 等函数时奇怪的 return 类型
(builder: StreamsBuilder, stream: KStream[String, Event]) =>
val wgCreatedStream = stream.groupBy((_,v) =>
v.payload match {
case x:WorkgroupCreated => x.id //this is a String
})
以这段代码为例。 StreamsBuilder
结果为 KGroupedStream[Nothing,Event]
当我使用这些导入时:
import org.apache.kafka.streams.scala.{ByteArrayWindowStore, StreamsBuilder}
import org.apache.kafka.streams.scala.kstream.{Grouped, KStream, Materialized}
import org.apache.kafka.streams.scala.Serdes.{String,Long}
return类型终于改成了KGroupedStream[String,Event]
我真正希望的是:
使用 kafka-stream-scala
而不重构我们的数据处理器
如果是,那就太棒了,尤其是。如果有例子!
如果不是……那将是一段痛苦的旅程。 :((但还是谢谢)
好吧,和同事聊天后,我马上找到了答案。 -___-
来自 org.apache.kafka.streams.scala.ImplicitConversions
、
有一个 wrapKStream()
将
streams.kstream.KStream
转换为 streams.scala.kstream.KStream
反之,直接调用inner
方法即可。 KTable 也一样。
我们的库有使用典型 kafka-stream 的数据处理器(Scala 代码):
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
移动到 kafka-stream-scala
的原因是每当我使用 groupBy、selectByKey 等函数时奇怪的 return 类型
(builder: StreamsBuilder, stream: KStream[String, Event]) =>
val wgCreatedStream = stream.groupBy((_,v) =>
v.payload match {
case x:WorkgroupCreated => x.id //this is a String
})
以这段代码为例。 StreamsBuilder
结果为 KGroupedStream[Nothing,Event]
当我使用这些导入时:
import org.apache.kafka.streams.scala.{ByteArrayWindowStore, StreamsBuilder}
import org.apache.kafka.streams.scala.kstream.{Grouped, KStream, Materialized}
import org.apache.kafka.streams.scala.Serdes.{String,Long}
return类型终于改成了KGroupedStream[String,Event]
我真正希望的是:
使用 kafka-stream-scala
而不重构我们的数据处理器
如果是,那就太棒了,尤其是。如果有例子!
如果不是……那将是一段痛苦的旅程。 :((但还是谢谢)
好吧,和同事聊天后,我马上找到了答案。 -___-
来自 org.apache.kafka.streams.scala.ImplicitConversions
、
有一个 wrapKStream()
将
streams.kstream.KStream
转换为 streams.scala.kstream.KStream
反之,直接调用inner
方法即可。 KTable 也一样。