Spark Streaming + Kafka 集成 0.8.2.1
Spark Streaming + Kafka Integration 0.8.2.1
我在将 spark 与 kafka 集成时遇到问题。我使用 spark-streaming-kafka-0-8。我用 SBT 编译。
这是我的代码:
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka._
object sparkKafka {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val kafkaStream = KafkaUtils.createStream(ssc,
"localhost:2181", "spark stream", Map("customer" -> 2))
kafkaStream.print()
ssc.start()
ssc.awaitTermination()
}
}
我收到这个错误:
`[info] Running sparkKafka
[error] (run-main-0) java.lang.NoClassDefFoundError: scala/Product$class
[error] java.lang.NoClassDefFoundError: scala/Product$class
[error] at org.apache.spark.SparkConf$DeprecatedConfig.<init>(SparkConf.scala:723)
[error] at org.apache.spark.SparkConf$.<init>(SparkConf.scala:571)
[error] at org.apache.spark.SparkConf$.<clinit>(SparkConf.scala)
[error] at org.apache.spark.SparkConf.set(SparkConf.scala:92)
[error] at org.apache.spark.SparkConf.set(SparkConf.scala:81)
[error] at org.apache.spark.SparkConf.setAppName(SparkConf.scala:118)
[error] at sparkKafka$.main(sparkKafka.scala:15)
[error] at sparkKafka.main(sparkKafka.scala)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error] at java.lang.reflect.Method.invoke(Method.java:498)
[error] Caused by: java.lang.ClassNotFoundException: scala.Product$class
[error] at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
[error] at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
[error] at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
[error] at org.apache.spark.SparkConf$DeprecatedConfig.<init>(SparkConf.scala:723)
[error] at org.apache.spark.SparkConf$.<init>(SparkConf.scala:571)
[error] at org.apache.spark.SparkConf$.<clinit>(SparkConf.scala)
[error] at org.apache.spark.SparkConf.set(SparkConf.scala:92)
[error] at org.apache.spark.SparkConf.set(SparkConf.scala:81)
[error] at org.apache.spark.SparkConf.setAppName(SparkConf.scala:118)
[error] at sparkKafka$.main(sparkKafka.scala:15)
[error] at sparkKafka.main(sparkKafka.scala)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error] at java.lang.reflect.Method.invoke(Method.java:498)
[error] Nonzero exit code: 1
[error] (Compile / run) Nonzero exit code: 1
[error] Total time: 6 s, completed Jan 14, 2019 2:19:15 PM.`
这是我的 build.sbt 文件:
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.2.0"
如何将 Spark Streaming 与 Kafka 交互?我什至有问题 spark-streaming-kafka-0-10....
谢谢
这是 Scala 或 Spark 的版本问题。确保您首先使用的是 Scala 2.11
如果您使用的是 Kafka 0.10 或更高版本(如果您最近安装了 Kafka,并且只是 运行 在本地,那么您可能会),那么您不应该使用 kafka-0-8
包。
不要将 spark-streaming-kafka-0-8
与 spark-streaming-kafka-0-10
混用
因此,如果您想使用 0-10
、,则包必须是 org.apache.spark.streaming.kafka010
,而不是 org.apache.spark.streaming.kafka
另外,请注意 0-8
确实使用了 Zookeeper(例如 localhost:2181
),而 0-10
没有。
我在将 spark 与 kafka 集成时遇到问题。我使用 spark-streaming-kafka-0-8。我用 SBT 编译。 这是我的代码:
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka._
object sparkKafka {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val kafkaStream = KafkaUtils.createStream(ssc,
"localhost:2181", "spark stream", Map("customer" -> 2))
kafkaStream.print()
ssc.start()
ssc.awaitTermination()
}
}
我收到这个错误:
`[info] Running sparkKafka
[error] (run-main-0) java.lang.NoClassDefFoundError: scala/Product$class
[error] java.lang.NoClassDefFoundError: scala/Product$class
[error] at org.apache.spark.SparkConf$DeprecatedConfig.<init>(SparkConf.scala:723)
[error] at org.apache.spark.SparkConf$.<init>(SparkConf.scala:571)
[error] at org.apache.spark.SparkConf$.<clinit>(SparkConf.scala)
[error] at org.apache.spark.SparkConf.set(SparkConf.scala:92)
[error] at org.apache.spark.SparkConf.set(SparkConf.scala:81)
[error] at org.apache.spark.SparkConf.setAppName(SparkConf.scala:118)
[error] at sparkKafka$.main(sparkKafka.scala:15)
[error] at sparkKafka.main(sparkKafka.scala)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error] at java.lang.reflect.Method.invoke(Method.java:498)
[error] Caused by: java.lang.ClassNotFoundException: scala.Product$class
[error] at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
[error] at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
[error] at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
[error] at org.apache.spark.SparkConf$DeprecatedConfig.<init>(SparkConf.scala:723)
[error] at org.apache.spark.SparkConf$.<init>(SparkConf.scala:571)
[error] at org.apache.spark.SparkConf$.<clinit>(SparkConf.scala)
[error] at org.apache.spark.SparkConf.set(SparkConf.scala:92)
[error] at org.apache.spark.SparkConf.set(SparkConf.scala:81)
[error] at org.apache.spark.SparkConf.setAppName(SparkConf.scala:118)
[error] at sparkKafka$.main(sparkKafka.scala:15)
[error] at sparkKafka.main(sparkKafka.scala)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error] at java.lang.reflect.Method.invoke(Method.java:498)
[error] Nonzero exit code: 1
[error] (Compile / run) Nonzero exit code: 1
[error] Total time: 6 s, completed Jan 14, 2019 2:19:15 PM.`
这是我的 build.sbt 文件:
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.2.0"
如何将 Spark Streaming 与 Kafka 交互?我什至有问题 spark-streaming-kafka-0-10....
谢谢
这是 Scala 或 Spark 的版本问题。确保您首先使用的是 Scala 2.11
如果您使用的是 Kafka 0.10 或更高版本(如果您最近安装了 Kafka,并且只是 运行 在本地,那么您可能会),那么您不应该使用 kafka-0-8
包。
不要将 spark-streaming-kafka-0-8
与 spark-streaming-kafka-0-10
因此,如果您想使用 0-10
、org.apache.spark.streaming.kafka010
,而不是 org.apache.spark.streaming.kafka
另外,请注意 0-8
确实使用了 Zookeeper(例如 localhost:2181
),而 0-10
没有。