如何将 Java api KStream 转换为 Scala api KStream?

How to convert the Java api KStream to Scala api KStream?

我们的库有使用典型 kafka-stream 的数据处理器(Scala 代码):

import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream

移动到 kafka-stream-scala 的原因是每当我使用 groupBy、selectByKey 等函数时奇怪的 return 类型

(builder: StreamsBuilder, stream: KStream[String, Event]) =>
 val wgCreatedStream = stream.groupBy((_,v) => 
  v.payload match {
   case x:WorkgroupCreated => x.id //this is a String
  })

以这段代码为例。 StreamsBuilder 结果为 KGroupedStream[Nothing,Event]

当我使用这些导入时:

import org.apache.kafka.streams.scala.{ByteArrayWindowStore, StreamsBuilder}
import org.apache.kafka.streams.scala.kstream.{Grouped, KStream, Materialized}
import org.apache.kafka.streams.scala.Serdes.{String,Long}

return类型终于改成了KGroupedStream[String,Event]

我真正希望的是: 使用 kafka-stream-scala 而不重构我们的数据处理器

如果是,那就太棒了,尤其是。如果有例子!
如果不是……那将是一段痛苦的旅程。 :((但还是谢谢)

好吧,和同事聊天后,我马上找到了答案。 -___-

来自 org.apache.kafka.streams.scala.ImplicitConversions
有一个 wrapKStream()
streams.kstream.KStream 转换为 streams.scala.kstream.KStream

反之,直接调用inner方法即可。 KTable 也一样。