无法在 kafka 流上应用 selectKey
Not able to apply selectKey on kafka stream
我正在尝试使用 kafka 流编写一个简单的过滤逻辑,但每次使用 selectKey 方法都会出错,无法进一步处理。
val builder: StreamsBuilder = new StreamsBuilder
val textLines: KStream[String, String] = builder.stream[String, String](inputTopic)
val filteredValue: KStream[String, String] = textLines.
filter((key: String, value: String) => value.contains(","))
.selectKey[String]((key: String, value: String) => value.split(",")(0).toLowerCase)
错误:
overloaded method value selectKey with alternatives: (x: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: String, _ >: String, _ <: String],x: org.apache.kafka.streams.kstream.Named)org.apache.kafka.streams.kstream.KStream[String,String] <and> (x: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: String, _ >: String, _ <: String])org.apache.kafka.streams.kstream.KStream[String,String] cannot be applied to ((String, String) ⇒ String)
在 Scala 中,您有时需要指定确切的类型绑定,因为类型推断不会像您在 Scala 中习惯的那样工作。
当您发现这些键入错误时,您可以应用的经验法则是用新的显式键入的 val 分隔行,直到找到需要指定类型的位置。
类似问题及解决方案:
The compiler need some help to infer the correct type for the aggregator parameter.
To make it compile you can try:
val store: Materialized[String, String, KeyValueStore[Bytes, Array[Byte]]] = ???
private def save(pea: KStream[String, String]): Unit = {
val aggregator: Aggregator[String, String, String] = (_, _, value: String) => value
pea
.groupByKey()
.aggregate(() => """{folder: ""}""",
aggregator,
store)
}
来自 here.
是 Java 中的另一个,它也说明了 Kafka Streams 中严格类型的不同风格:
Try moving generic types specification after as method:
val state: KTable[String, String] = builder
.table[String, String]("BARY-PATH", Materialized.as[String, String,KeyValueStore[org.apache.kafka.common.utils.Bytes, Array[Byte]]]("PATH-STORE"))
正如您从 Java 签名中看到的那样,对于静态方法,您应该为方法指定泛型类型而不是 class.
您可能会发现许多其他示例,甚至在 Kafka Streams 之外,都是关于 Scala 中显式类型需求的。
我正在尝试使用 kafka 流编写一个简单的过滤逻辑,但每次使用 selectKey 方法都会出错,无法进一步处理。
val builder: StreamsBuilder = new StreamsBuilder
val textLines: KStream[String, String] = builder.stream[String, String](inputTopic)
val filteredValue: KStream[String, String] = textLines.
filter((key: String, value: String) => value.contains(","))
.selectKey[String]((key: String, value: String) => value.split(",")(0).toLowerCase)
错误:
overloaded method value selectKey with alternatives: (x: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: String, _ >: String, _ <: String],x: org.apache.kafka.streams.kstream.Named)org.apache.kafka.streams.kstream.KStream[String,String] <and> (x: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: String, _ >: String, _ <: String])org.apache.kafka.streams.kstream.KStream[String,String] cannot be applied to ((String, String) ⇒ String)
在 Scala 中,您有时需要指定确切的类型绑定,因为类型推断不会像您在 Scala 中习惯的那样工作。 当您发现这些键入错误时,您可以应用的经验法则是用新的显式键入的 val 分隔行,直到找到需要指定类型的位置。
类似问题及解决方案:
The compiler need some help to infer the correct type for the aggregator parameter.
To make it compile you can try:
val store: Materialized[String, String, KeyValueStore[Bytes, Array[Byte]]] = ???
private def save(pea: KStream[String, String]): Unit = {
val aggregator: Aggregator[String, String, String] = (_, _, value: String) => value
pea
.groupByKey()
.aggregate(() => """{folder: ""}""",
aggregator,
store)
}
来自 here.
Try moving generic types specification after as method:
val state: KTable[String, String] = builder
.table[String, String]("BARY-PATH", Materialized.as[String, String,KeyValueStore[org.apache.kafka.common.utils.Bytes, Array[Byte]]]("PATH-STORE"))
正如您从 Java 签名中看到的那样,对于静态方法,您应该为方法指定泛型类型而不是 class.
您可能会发现许多其他示例,甚至在 Kafka Streams 之外,都是关于 Scala 中显式类型需求的。