Google Cloud Dataproc 上的 Pyspark 作业失败

Pyspark Job Failure on Google Cloud Dataproc

我创建了一个具有 1 个主节点和 10 个节点的 Dataproc 集群。都具有相同的 CPU 和内存配置:32 vCPU,120 GB 内存。当我提交一份处理大量数据和计算的工作时。作业失败。

从日志记录来看,我不太确定是什么原因导致了失败。但是我从 tJob#: job-c46fc848-6 看到了与内存相关的错误消息: 容器因超出内存限制而被 YARN 杀死。使用了 24.1 GB 的 24 GB 物理内存。考虑提升 spark.yarn.executor.memoryOverhead.

所以我尝试了一些从其他帖子中找到的解决方案。例如,当从 "Jobs" 控制台提交作业时,我试图在 "Properties" 部分增加 spark.executor.memoryOverhead 和 spark.driver.maxResultSize。 job# find-duplicate-job-c46fc848-7 仍然失败。

我也看到了警告消息,但不太确定它是什么意思: 18/06/04 17:13:25 警告 org.apache.spark.storage.BlockManagerMasterEndpoint:rdd_43_155 没有更多副本可用!

我打算尝试创建一个更高级别的集群,看看它是否有效。但我怀疑它是否能解决这个问题,因为集群有 1 个主节点和 10 个节点,32 个 vCPU,120 GB 内存已经很强大了。

希望得到高级用户和专家的帮助。提前致谢!

失败的根本原因与自交叉连接导致的内存有关。即使我不断增加 CPU 的力量和记忆,它仍然失败。所以这个解决方案是以下组合。

  1. 使用repartition() 函数在join 之后,在下一次转换之前重新分区。这将解决数据倾斜问题。例如:df_joined = df_joined.repartition(分区)
  2. 广播权table。
  3. 将其分成 10 次迭代。在每次迭代中,我只处理左侧 table 的 1/10 与右侧 table 的完整数据相结合。

查看示例代码:

groups = 10 <br/>
for x in range(0, groups): 
  df_joined = df1.join(broadcast(df2), (df1.authors == df2.authors)).where((col("df1.content_id") % groups == x)) 

结合以上 3 种方法,我能够在 1.5 小时内完成工作,并且只使用了 1 个主节点和 4 个工作节点(每个 vm 8 CPU 和 30 GB)。