无法使用 spark sql 读取 kafka
Unable to read kafka using spark sql
我正在尝试使用 spark 阅读 kafka,但我想我面临着一些与图书馆相关的问题。
我正在将一些事件推送到 kafka 主题,我可以通过 kafka 控制台消费者阅读这些主题,但无法通过 spark 阅读。我正在使用 spark-sql-kafka 库,该项目是用 maven 编写的。 Scala版本是2.11.12,spark版本是2.4.3.
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.3</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.4.3</version>
<scope>provided</scope>
</dependency>
我的 java 代码如下:-
SparkSession spark = SparkSession.builder()
.appName("kafka-tutorials")
.master("local[*]")
.getOrCreate();
Dataset<Row> rows = spark.readStream().
format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "meetup-trending-topics")
.option("startingOffsets", "latest")
.load();
rows.writeStream()
.outputMode("append")
.format("console")
.start();
spark.streams().awaitAnyTermination();
spark.stop();
以下是我收到的错误消息:-
线程异常"main" org.apache.spark.sql.AnalysisException:找不到数据源:kafka。请按照 "Structured Streaming + Kafka Integration Guide" 的部署部分部署应用程序;
在 org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
在 org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
解决方法:-
两者之一 1)create uber jar 或 ii) --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3
我之前在 mainclass .
之后给出了 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 选项
这个:
<scope>provided</scope>
表示您负责提供合适的jar。我(和许多其他人)更愿意避免使用此范围,而是构建一个 uberjar 进行部署。
我正在尝试使用 spark 阅读 kafka,但我想我面临着一些与图书馆相关的问题。
我正在将一些事件推送到 kafka 主题,我可以通过 kafka 控制台消费者阅读这些主题,但无法通过 spark 阅读。我正在使用 spark-sql-kafka 库,该项目是用 maven 编写的。 Scala版本是2.11.12,spark版本是2.4.3.
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.3</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.4.3</version>
<scope>provided</scope>
</dependency>
我的 java 代码如下:-
SparkSession spark = SparkSession.builder()
.appName("kafka-tutorials")
.master("local[*]")
.getOrCreate();
Dataset<Row> rows = spark.readStream().
format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "meetup-trending-topics")
.option("startingOffsets", "latest")
.load();
rows.writeStream()
.outputMode("append")
.format("console")
.start();
spark.streams().awaitAnyTermination();
spark.stop();
以下是我收到的错误消息:-
线程异常"main" org.apache.spark.sql.AnalysisException:找不到数据源:kafka。请按照 "Structured Streaming + Kafka Integration Guide" 的部署部分部署应用程序; 在 org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652) 在 org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
解决方法:- 两者之一 1)create uber jar 或 ii) --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 我之前在 mainclass .
之后给出了 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 选项这个:
<scope>provided</scope>
表示您负责提供合适的jar。我(和许多其他人)更愿意避免使用此范围,而是构建一个 uberjar 进行部署。