使用 spark-submit 从 google dataproc spark 集群读取 GCP 中的 JSON(zipped .gz) 时,未使用所有执行程序

All the executors are not being used when reading JSON(zipped .gz) in GCP from google dataproc spark cluster using spark-submit

我刚刚使用 GCP(dataproc) 和 pyspark 了解了大数据和云技术这个美妙的世界。我有 ~5 GB 大小 JSON 文件(压缩,gz 文件)包含 ~500 万 记录,我需要读取每一行并只处理满足特定条件的那些行。我有我的工作代码,我用 --num-partitions=5 发出了 spark-submit 但仍然 只有一个工人 用于执行动作。

这是我正在使用的 spark-submit 命令:

spark-submit --num-executors 5 --py-files /home/user/code/dist/package-0.1-py3.6.egg job.py

job.py:

path = "gs://dataproc-bucket/json-files/data_5M.json.gz"
mi = spark.read.json(path)
inf_rel = mi.select(mi.client_id,
                    mi.user_id,
                    mi.first_date,
                    F.hour(mi.first_date).alias('hour'),
                    mi.notes).rdd.map(foo).filter(lambda x: x)
inf_relevance = inf_rel.map(lambda l: Row(**dict(l))).toDF()
save_path = "gs://dataproc-bucket/json-files/output_5M.json"
inf_relevance.write.mode('append').json(save_path)
print("END!!")

Dataproc 配置: (我现在使用免费帐户,一旦我开始工作解决方案将添加更多核心和执行程序)

(Debian 9、Hadoop 2.9、Spark 2.4) Master node:2 vCPU, 7.50 GB 内存 主磁盘大小:32 GB 5 个工作节点:1 个 vCPU,3.75 GB 内存 主磁盘类型:32 GB

在 spark-submit 之后,我可以在网络中看到 UI 添加了 5 个执行程序,但只有 1 个执行程序保持活动状态并执行所有任务,其余 4 个被释放。

我做了研究,大部分问题都是关于通过 JDBC 访问数据的。

请提出我在这里缺少的内容。

P.S。最终我会读取 64 json 个文件,每个文件 5 GB,因此可能会使用 8 个核心 * 100 个工作人员。

最好的办法是对输入进行预处理。给定一个输入文件,spark.read.json(... 将创建一个任务来读取和解析 JSON 数据,因为 Spark 无法提前知道如何并行化它。如果您的数据采用 line-delimited JSON 格式 (http://jsonlines.org/),最好的做法是事先将其拆分为可管理的块:

path = "gs://dataproc-bucket/json-files/data_5M.json"
# read monolithic JSON as text to avoid parsing, repartition and *then* parse JSON
mi = spark.read.json(spark.read.text(path).repartition(1000).rdd)
inf_rel = mi.select(mi.client_id,
                   mi.user_id,
                   mi.first_date,
                   F.hour(mi.first_date).alias('hour'),
                   mi.notes).rdd.map(foo).filter(lambda x: x)
inf_relevance = inf_rel.map(lambda l: Row(**dict(l))).toDF()
save_path = "gs://dataproc-bucket/json-files/output_5M.json"
inf_relevance.write.mode('append').json(save_path)
print("END!!")

您在此处的初始步骤 (spark.read.text(...) 作为单个任务仍会遇到瓶颈。如果您的数据不是 line-delimited 或者(尤其是!)您预计您将需要不止一次地使用这些数据,您应该想办法将您的 5GB JSON 文件变成 1000 个 5MB JSON 文件,然后再让 Spark 参与其中。

.gz 文件不可拆分,因此它们由一个内核读取并放在一个分区上。

参考