在 jupyter notebook 中将自定义 jar 添加到 pyspark

Adding custom jars to pyspark in jupyter notebook

我正在使用带有 Pyspark 的 Jupyter 笔记本和以下 docker 图像Jupyter all-spark-notebook

现在我想编写一个 pyspark 流应用程序,它使用来自 Kafka 的消息。在 Spark-Kafka Integration guide 中,他们描述了如何使用 spark-submit 部署此类应用程序(它需要 linking 一个外部 jar - 解释在 3.Deploying 中)。但是因为我使用的是 Jupyter notebook,所以我实际上从未 运行 spark-submit 命令,我假设如果我按执行,它会在后面得到 运行。

spark-submit 命令中你可以指定一些参数,其中之一是 -jars,但我不清楚如何从笔记本(或外部环境)设置这个参数变量?)。我假设我可以通过 SparkConfSparkContext 对象动态地 link 这个外部 jar。有没有人有过如何从笔记本正确执行 linking 的经验?

您可以通过设置相关的环境变量,使用 pyspark 命令 运行 您的 jupyter notebook:

export PYSPARK_DRIVER_PYTHON=jupyter
export IPYTHON=1
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --port=XXX --ip=YYY"

XXX 是您要用来访问笔记本的端口,YYY 是 IP 地址。

现在只需 运行 pyspark 并添加 --jars 作为开关,就像 spark submit

我已经设法让它在 jupyter notebook 中工作,它是 运行 形成 all-spark 容器。

我在 jupyterhub 中启动一个 python3 笔记本并覆盖 PYSPARK_SUBMIT_ARGS 标志,如下所示。 Kafka消费者库是从maven仓库下载的,放在我家目录/home/jovyan:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = 
  '--jars /home/jovyan/spark-streaming-kafka-assembly_2.10-1.6.1.jar pyspark-shell'

import pyspark
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext

sc = pyspark.SparkContext()
ssc = StreamingContext(sc,1)

broker = "<my_broker_ip>"
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["test1"],
                        {"metadata.broker.list": broker})
directKafkaStream.pprint()
ssc.start()

注意:不要忘记环境变量中的pyspark-shell!

扩展: 如果你想包含来自 spark-packages 的代码,你可以使用 --packages 标志。可以找到有关如何在 all-spark-notebook 中执行此操作的示例 here

为了使用 spark 处理 jupyter-notebook,您需要在创建 sparkContext 对象之前提供外部 jar 的位置。 pyspark --jars youJar 将创建一个带有外部罐子位置的 sparkcontext

确实,有一种方法可以在您创建 SparkSession 时通过 SparkConf 对象动态 link 它,如 :

中所述
spark = SparkSession \
    .builder \
    .appName("My App") \
    .config("spark.jars", "/path/to/jar.jar,/path/to/another/jar.jar") \
    .getOrCreate()

以防有人和我一样:我尝试了以上所有解决方案,其中 none 对我有用。我想做的是在 Jupyter notebook 中使用 Delta Lake

我终于可以通过先调用 SparkContext.addPyFile("/path/to/your/jar.jar") 来使用 from delta.tables import *。虽然在 spark 官方文档中,它只提到添加 .zip.py 文件,但我尝试 .jar 并且它完美地工作。