Kafka 流过滤问题
Issue with Kafka stream filtering
我正在尝试 运行 来自以下示例的基本应用程序:
但是我在这一行遇到异常:
// Variant 1: using `mapValues`
val uppercasedWithMapValues: KStream[Array[Byte], String] = textLines.mapValues(_.toUpperCase())
Error:(33, 25) missing parameter type for expanded function ((x) =>
x.toUpperCase())
textLines.mapValues(_.toUpperCase())
将光标悬停在代码上时出现错误:
Type mismatch, expected: ValueMapper[_ >: String, _ <: NotInferedVR],
actual: (Any) => Any Cannot resolve symbol toUpperCase
我的 sbt 文件的内容:
name := "untitled1"
version := "0.1"
scalaVersion := "2.11.11"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.11.0.0"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams
libraryDependencies += "org.apache.kafka" % "kafka-streams" % "0.11.0.0"
// https://mvnrepository.com/artifact/org.apache.kafka/connect-api
libraryDependencies += "org.apache.kafka" % "connect-api" % "0.11.0.0"
我真的不确定如何继续,因为我对 Scala 很陌生。我想知道问题是什么以及如何解决。
The root cause of this problem is Scala-Java interoperability – the Kafka Streams API is implemented in Java, but your application is written in Scala. Notably, this problem is caused by how the type systems of Java and Scala interact. Generic wildcards in Java, for example, are often causing such Scala issues.
To fix the problem you would need to declare types explicitly in your Scala application in order for the code to compile. For example, you may need to break a single statement that chains multiple DSL operations into multiple statements, where each statement explicitly declares the respective return types. The StreamToTableJoinScalaIntegrationTest demonstrates how the types of return variables are explicitly declared.
更新
Kafka 2.0(将于 6 月发布)包含一个适当的 Scala API,可以避免这些问题。比较 https://cwiki.apache.org/confluence/display/KAFKA/KIP-270+-+A+Scala+Wrapper+Library+for+Kafka+Streams
我正在尝试 运行 来自以下示例的基本应用程序:
但是我在这一行遇到异常:
// Variant 1: using `mapValues`
val uppercasedWithMapValues: KStream[Array[Byte], String] = textLines.mapValues(_.toUpperCase())
Error:(33, 25) missing parameter type for expanded function ((x) => x.toUpperCase()) textLines.mapValues(_.toUpperCase())
将光标悬停在代码上时出现错误:
Type mismatch, expected: ValueMapper[_ >: String, _ <: NotInferedVR], actual: (Any) => Any Cannot resolve symbol toUpperCase
我的 sbt 文件的内容:
name := "untitled1"
version := "0.1"
scalaVersion := "2.11.11"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.11.0.0"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams
libraryDependencies += "org.apache.kafka" % "kafka-streams" % "0.11.0.0"
// https://mvnrepository.com/artifact/org.apache.kafka/connect-api
libraryDependencies += "org.apache.kafka" % "connect-api" % "0.11.0.0"
我真的不确定如何继续,因为我对 Scala 很陌生。我想知道问题是什么以及如何解决。
The root cause of this problem is Scala-Java interoperability – the Kafka Streams API is implemented in Java, but your application is written in Scala. Notably, this problem is caused by how the type systems of Java and Scala interact. Generic wildcards in Java, for example, are often causing such Scala issues.
To fix the problem you would need to declare types explicitly in your Scala application in order for the code to compile. For example, you may need to break a single statement that chains multiple DSL operations into multiple statements, where each statement explicitly declares the respective return types. The StreamToTableJoinScalaIntegrationTest demonstrates how the types of return variables are explicitly declared.
更新
Kafka 2.0(将于 6 月发布)包含一个适当的 Scala API,可以避免这些问题。比较 https://cwiki.apache.org/confluence/display/KAFKA/KIP-270+-+A+Scala+Wrapper+Library+for+Kafka+Streams