如何正确提交 kafka 流式 pyspark 作业到 Google Dataproc

How to properly submit kafka streaming pyspark job to Google Dataproc

我正在尝试通过 Dataproc UI 提交一个 pyspark 作业,但一直出现错误,看起来它没有加载 kafka 流媒体包。

这是我工作中 UI 提供的 REST 命令: POST /v1/projects/projectname/regions/global/jobs:submit/ { "projectId": "projectname", "job": { "placement": { "clusterName": "cluster-main" }, "reference": { "jobId": "job-33ab811a" }, "pysparkJob": { "mainPythonFileUri": "gs://projectname/streaming.py", "args": [ "--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0" ], "jarFileUris": [ "gs://projectname/spark-streaming-kafka-0-10_2.11-2.2.0.jar" ] } } }

我尝试将 kafka 包作为 args 和 jar 文件传递​​。

这是我的代码 (streaming.py):

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json


sc = SparkContext()

spark = SparkSession.builder.master("local").appName("Spark-Kafka-Integration").getOrCreate()

# < ip > is masked
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "<ip>:9092") \
    .option("subscribe", "rsvps") \
    .option("startingOffsets", "earliest") \
    .load()
df.printSchema()

错误: : java.lang.ClassNotFoundException: 找不到数据源:kafka。请在 http://spark.apache.org/third-party-projects.html

找到包裹

完整跟踪:https://pastebin.com/Uz3iGy2N

您可能 运行 遇到以下问题:“--packages” 是 spark-submit 中的语法糖,当 higher-level 工具 (Dataproc) 以编程方式调用 Spark 时,它的交互效果很差提交,使用我在此处的回复中描述的替代语法:

长话短说,您可以使用 properties 在 Dataproc 请求中指定等效的 spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,而不是在作业参数中传递 --properties