无法在 kafka 流上应用 selectKey

Not able to apply selectKey on kafka stream

我正在尝试使用 kafka 流编写一​​个简单的过滤逻辑,但每次使用 selectKey 方法都会出错,无法进一步处理。

val builder: StreamsBuilder = new StreamsBuilder
    val textLines: KStream[String, String] = builder.stream[String, String](inputTopic)

    val filteredValue: KStream[String, String] =  textLines.
    filter((key: String, value: String) => value.contains(","))
     .selectKey[String]((key: String, value: String) => value.split(",")(0).toLowerCase)

错误:

overloaded method value selectKey with alternatives: (x: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: String, _ >: String, _ <: String],x: org.apache.kafka.streams.kstream.Named)org.apache.kafka.streams.kstream.KStream[String,String] <and> (x: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: String, _ >: String, _ <: String])org.apache.kafka.streams.kstream.KStream[String,String] cannot be applied to ((String, String) ⇒ String)

在 Scala 中,您有时需要指定确切的类型绑定,因为类型推断不会像您在 Scala 中习惯的那样工作。 当您发现这些键入错误时,您可以应用的经验法则是用新的显式键入的 val 分隔行,直到找到需要指定类型的位置。

类似问题及解决方案:

The compiler need some help to infer the correct type for the aggregator parameter.

To make it compile you can try:

val store: Materialized[String, String, KeyValueStore[Bytes, Array[Byte]]] = ???

private def save(pea: KStream[String, String]): Unit = {
  val aggregator: Aggregator[String, String, String] = (_, _, value: String) => value
  pea
    .groupByKey()
    .aggregate(() => """{folder: ""}""",
      aggregator,
      store)
}

来自 here.

是 Java 中的另一个,它也说明了 Kafka Streams 中严格类型的不同风格:

Try moving generic types specification after as method:

val state: KTable[String, String]  = builder
    .table[String, String]("BARY-PATH", Materialized.as[String, String,KeyValueStore[org.apache.kafka.common.utils.Bytes, Array[Byte]]]("PATH-STORE"))

正如您从 Java 签名中看到的那样,对于静态方法,您应该为方法指定泛型类型而不是 class.

您可能会发现许多其他示例,甚至在 Kafka Streams 之外,都是关于 Scala 中显式类型需求的。