如何在 IntelliJ IDEA 中使用 Kafka Direct Stream 运行 Spark Streaming 应用程序?

How to run Spark Streaming application with Kafka Direct Stream in IntelliJ IDEA?

我是 运行 Spark 的程序,使用 Kafka 进行流式传输,但出现错误。所有导入都已完成,看起来没有任何问题。

我使用 IntelliJ IDEA 编写了一些代码,但在第一次 运行 该程序时出现错误,我是 [ 的新手=40=] 但来自 C# 背景。所以无法理解这个问题。 zookeeper 服务已启动,同时 kafka-server 已启动并创建了一个名为 topicA 的主题。生产者也准备好流式传输数据,但我在 IntelliJ 中有问题 运行 代码用于收听队列

def main(args: Array[String]) {
  val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "localhost:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "0",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
  )
  val conf = new SparkConf().setAppName("Simple Streaming Application")
  val ssc = new StreamingContext(conf, Seconds(5))
  val topics = Array("topicA")
  val stream = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
  )

  stream.foreachRDD { rdd =>
    // Get the offset ranges in the RDD
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    for (o <- offsetRanges) {
      println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to ${o.untilOffset}")
    }
  }

  ssc.start

  // the above code is printing out topic details every 5 seconds
  // until you stop it.

  ssc.stop(stopSparkContext = false)
}

产生的异常是:

Exception in thread "main" java.lang.VerifyError: class scala.collection.mutable.WrappedArray overrides final method toBuffer.()Lscala/collection/mutable/Buffer;
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access0(URLClassLoader.java:74)
at java.net.URLClassLoader.run(URLClassLoader.java:369)
at java.net.URLClassLoader.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.SparkConf.loadFromSystemProperties(SparkConf.scala:75)
at org.apache.spark.SparkConf.<init>(SparkConf.scala:70)
at org.apache.spark.SparkConf.<init>(SparkConf.scala:57)
at sparkStreamClass$.main(sparkStreamClass.scala:20)
at sparkStreamClass.main(sparkStreamClass.scala)

这是我的 pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.learnStreaming</groupId>
    <artifactId>sparkProjectArtifact</artifactId>
    <version>1.0-SNAPSHOT</version>

<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.3.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.3.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.3.1</version>
        <scope>provided</scope>
    </dependency>

</dependencies>
</project>

修改了 pom.xml,它对我有用!

 <properties>
    <spark.version>2.1.0</spark.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
</dependencies>

我也遇到这个问题,它是由于使用了错误版本的 Scala.For 示例导致的,我在我的 maven 构建插件中定义了 scala2.11.8,但我使用的是 scala2.13.So 检查 pom 构建scala 版本并在 ide.

中使用 scala verison

我们遇到此问题是因为 SCALA 和 SPARK 库之间的版本不兼容

当我进行以下配置时,我遇到了类似的错误: spark-core_2.11 , spark-sql_2.11 , Scala - 2.13

我在我的系统中安装了 scala 2.11 并向程序模块添加了新的 Scala SDK 库并且它工作了。