Pyspark 应用程序仅部分利用 dataproc 集群资源
Pyspark application only partly exploits dataproc cluster resources
我的 pyspark 应用程序 运行 是一个 106,36 MB 数据集(817.270 条记录)的 UDF,使用常规 python lambda 函数大约需要 100 小时。我生成了一个 Google Dataproc 集群,其中有 20 个工作节点,每个节点有 8 个 vCPU。但是,执行时总共只使用了 3 个节点和 3 个 vCPU。显然,我希望集群使用我提供的所有资源。
我生成的数据帧的默认分区数是 8。我尝试将其重新分区为 100,但集群一直只使用 3 个节点和 3 个 vCPU。另外,当我 运行 一个命令来检查 spark 看到的执行者数量时,它只有 3 个。
这是执行的 pyspark 代码:
from pyspark.sql.types import StringType, MapType
from pyspark.sql.functions import udf
customer_names = spark.createDataFrame(customer_names)
embargo_match_udf = udf(lambda x,y: embargoMatch(x,y), MapType(StringType(), StringType()))
customer_names = customer_names.withColumn('JaroDistance', embargo_match_udf('name','customer_code'))
result = customer_names.withColumn('jaro_similarity', customer_names.JaroDistance['max_jaro'])
result.write.format("com.databricks.spark.csv").save('gs://charles-embargo-bucket/sparkytuesday')
这是从我的 jupyter notebook 中看到的一些 spark 输出
print(sc) -> <SparkContext master=yarn appName=PySparkShell>
print(result.rdd.getNumPartitions()) -> 8
result = result.repartition(100)
print(result.rdd.getNumPartitions()) -> 100
sc._jsc.sc().getExecutorMemoryStatus().size() -> 3
对于那些对我如何解决问题感兴趣的人:
默认情况下,我的 spark 上下文假定有两个工作节点,无论我在 Google Cloud 的 Dataproc UI 中生成了多少额外节点。因此,我手动更改了 Spark 上下文,如下所示:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.conf import SparkConf
sc.stop()
SparkContext.setSystemProperty('spark.executor.cores', '4')
SparkContext.setSystemProperty('spark.executor.instances', '5')
sc = SparkContext("yarn", "embargotest")
spark = SparkSession.builder.appName('embargotest').getOrCreate()
此外,在将 .withColumn 函数应用于此数据框之前,我将 customer_names 数据集明确划分为 20 个(4 核 x 5 个实例)。
customer_names = spark.createDataFrame(customer_names).repartition(20)
希望这可以帮助遇到类似问题的人!
此外,您可能想尝试以下操作,让 PySpark 通过 Dynamic Allocation 动态调整应用程序中的执行程序数量:
SparkContext.setSystemProperty("spark.dynamicAllocation.enabled", "true")
我的 pyspark 应用程序 运行 是一个 106,36 MB 数据集(817.270 条记录)的 UDF,使用常规 python lambda 函数大约需要 100 小时。我生成了一个 Google Dataproc 集群,其中有 20 个工作节点,每个节点有 8 个 vCPU。但是,执行时总共只使用了 3 个节点和 3 个 vCPU。显然,我希望集群使用我提供的所有资源。
我生成的数据帧的默认分区数是 8。我尝试将其重新分区为 100,但集群一直只使用 3 个节点和 3 个 vCPU。另外,当我 运行 一个命令来检查 spark 看到的执行者数量时,它只有 3 个。
这是执行的 pyspark 代码:
from pyspark.sql.types import StringType, MapType
from pyspark.sql.functions import udf
customer_names = spark.createDataFrame(customer_names)
embargo_match_udf = udf(lambda x,y: embargoMatch(x,y), MapType(StringType(), StringType()))
customer_names = customer_names.withColumn('JaroDistance', embargo_match_udf('name','customer_code'))
result = customer_names.withColumn('jaro_similarity', customer_names.JaroDistance['max_jaro'])
result.write.format("com.databricks.spark.csv").save('gs://charles-embargo-bucket/sparkytuesday')
这是从我的 jupyter notebook 中看到的一些 spark 输出
print(sc) -> <SparkContext master=yarn appName=PySparkShell>
print(result.rdd.getNumPartitions()) -> 8
result = result.repartition(100)
print(result.rdd.getNumPartitions()) -> 100
sc._jsc.sc().getExecutorMemoryStatus().size() -> 3
对于那些对我如何解决问题感兴趣的人:
默认情况下,我的 spark 上下文假定有两个工作节点,无论我在 Google Cloud 的 Dataproc UI 中生成了多少额外节点。因此,我手动更改了 Spark 上下文,如下所示:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.conf import SparkConf
sc.stop()
SparkContext.setSystemProperty('spark.executor.cores', '4')
SparkContext.setSystemProperty('spark.executor.instances', '5')
sc = SparkContext("yarn", "embargotest")
spark = SparkSession.builder.appName('embargotest').getOrCreate()
此外,在将 .withColumn 函数应用于此数据框之前,我将 customer_names 数据集明确划分为 20 个(4 核 x 5 个实例)。
customer_names = spark.createDataFrame(customer_names).repartition(20)
希望这可以帮助遇到类似问题的人!
此外,您可能想尝试以下操作,让 PySpark 通过 Dynamic Allocation 动态调整应用程序中的执行程序数量:
SparkContext.setSystemProperty("spark.dynamicAllocation.enabled", "true")