火花流 + 卡夫卡 - 火花 session API
spark streaming + kafka - spark session API
感谢您对 运行 使用 spark 2.0.2 的 spark 流程序的帮助。
运行 错误 "java.lang.ClassNotFoundException: Failed to find data source: kafka"
。修改后的 POM 文件如下。
正在创建 Spark,但在调用来自 kafka 的加载时出错。
创建火花session:
val spark = SparkSession
.builder()
.master(master)
.appName("Apache Log Analyzer Streaming from Kafka")
.config("hive.metastore.warehouse.dir", hiveWarehouse)
.config("fs.defaultFS", hdfs_FS)
.enableHiveSupport()
.getOrCreate()
创建kafka流:
val logLinesDStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:2181")
.option("subscribe", topics)
.load()
错误信息:
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark-packages.org
POM.XML:
<scala.version>2.10.4</scala.version>
<scala.compat.version>2.10</scala.compat.version>
<spark.version>2.0.2</spark.version>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
</dependencies>
当您实际需要 2.0.2 时,您正在引用 Spark 的 Kafka v1.5.1 参考。您还需要使用 sql-kafka
进行结构化流式处理:
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.10</artifactId>
<version>2.0.2</version>
请注意,SparkSession API 仅支持 Kafka >= 0.10
通过更改 POM.XML
修复了它
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
我遇到了同样的问题。我已将 spark 版本从 2.0.0 升级到 2.2.0 并添加了 Spark-sql-kafka 依赖项。它对我来说很完美。请找到依赖项。
<spark.version>2.2.0</spark.version>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
感谢您对 运行 使用 spark 2.0.2 的 spark 流程序的帮助。
运行 错误 "java.lang.ClassNotFoundException: Failed to find data source: kafka"
。修改后的 POM 文件如下。
正在创建 Spark,但在调用来自 kafka 的加载时出错。
创建火花session:
val spark = SparkSession
.builder()
.master(master)
.appName("Apache Log Analyzer Streaming from Kafka")
.config("hive.metastore.warehouse.dir", hiveWarehouse)
.config("fs.defaultFS", hdfs_FS)
.enableHiveSupport()
.getOrCreate()
创建kafka流:
val logLinesDStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:2181")
.option("subscribe", topics)
.load()
错误信息:
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark-packages.org
POM.XML:
<scala.version>2.10.4</scala.version>
<scala.compat.version>2.10</scala.compat.version>
<spark.version>2.0.2</spark.version>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
</dependencies>
当您实际需要 2.0.2 时,您正在引用 Spark 的 Kafka v1.5.1 参考。您还需要使用 sql-kafka
进行结构化流式处理:
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.10</artifactId>
<version>2.0.2</version>
请注意,SparkSession API 仅支持 Kafka >= 0.10
通过更改 POM.XML
修复了它<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
我遇到了同样的问题。我已将 spark 版本从 2.0.0 升级到 2.2.0 并添加了 Spark-sql-kafka 依赖项。它对我来说很完美。请找到依赖项。
<spark.version>2.2.0</spark.version>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>