多个后续连接的 Spark 性能问题
Spark performance issues with multiple subsequent joins
我们正在将大量本地 运行ning Python ETL 代码(使用 pandas)迁移到 Databricks 上的 Spark 运行ning。我们正在 运行 解决执行许多后续连接的部分中的一些性能问题(运行 在 pandas 中没问题)。
我们运行将我们的代码作为一个包放在 Databricks 集群上(在这个问题中很难共享孤立的、有效的代码)。所有联接随后在一个函数中发生。
我们加入的主数据框不是特别大:44 列有 819.000 条记录。总共我们将 27 个其他数据框连接到这个主数据框,每个只向主数据框添加 1 到 3 个额外的列。
所有数据框都连接在同一列(客户唯一标识符)上,该列在主数据框中是唯一的,不应以任何方式倾斜。
当我们 运行 我们在数据块集群上的完整代码并尝试执行任何 action 时,问题就出现了 count() 或 display() 在 之后,所有连接都已 运行。 运行一旦必须执行所有后续连接,对主数据帧进行简单计数的时间就会激增,我们不明白这是如何发生的。
一些额外信息:
- 集群是一个简单的 DS4_v2(28GB 内存,8 个内核),具有 2-8 个工作节点。
- 将默认的 shuffle 分区编号更改为非常低的编号(如 2)似乎没有帮助
- 连接之间的持久化和非持久化数据帧似乎没有帮助
- 扩大或缩小集群似乎没有帮助
下面是执行的所有联接的代码。每个代码块都是一个单独函数的一部分。这些函数随后在我们在 Databricks 集群上调用的另一个函数中调用。 df_pop
是我们的主要数据框。
加入 1:
df_pop = df_pop.join(other_df1, df_pop.bc == other_df1.bc, how='left_outer')
加入 2 - 21(循环 20 个人口数据帧并将每个数据帧加入 df_pop
):
for pop in self.des_config.get('populations'):
population = self.cleaned_data.get(pop).withColumnRenamed('bc', f'bc_{pop}').select(col(f'bc_{pop}'))
df_pop = df_pop.join(population, df_pop.bc == f'bc_{pop}', how = 'left_outer')\
.withColumn(f'bc_{pop}', F.when(col(f'bc_{pop}').isNull(), F.lit(False)).otherwise(F.lit(True)))\
.withColumnRenamed(f'bc_{pop}', f'pop_{pop}')
加入 22 - 25:
df_pop = df_pop.withColumn(f'pop_Real Estate', df_pop.bo_sector == 'REAL ESTATE')\
.withColumn(f'pop_O&O', df_pop.bo_sector == 'GOVERNMENT & EDUCATION')\
.join(other_df2, on = 'bc', how = 'left_outer').drop('other_df2.bc')\
.join(other_df3, on = 'bc', how = 'left_outer').drop('other_df3.bc')\
.join(other_df4, on = 'bc', how = 'left_outer').drop('other_df4.bc')\
.join(other_df5, other_df5.bc == df_pop.bc, how = 'left_outer')
加入 26:
df_pop = df_pop.join(other_df6, other_df6.bc == df_pop.bc, how = 'Left_outer')\
.fillna(False, subset=['pop_edr_backlog'])
加入 27:
df_pop = df_pop.join(other_df7, other_df7.bc == df_pop.bc, how = 'left_outer')
在这些加入后无法对 df_pop
执行任何操作。知道如何解决这个问题吗?
我们刚刚解决了这个问题。由于在加入之前发生的 .withColumnRenamed,在 for 循环中执行的加入 2-21 显然导致了引擎盖下的 'broadcast nested loop join'。删除它导致 'broadcast hash joins' 被执行得更快。所以用 `df_pop.explain('formatted') 检查执行计划并检查正在执行的连接类型。
我们正在将大量本地 运行ning Python ETL 代码(使用 pandas)迁移到 Databricks 上的 Spark 运行ning。我们正在 运行 解决执行许多后续连接的部分中的一些性能问题(运行 在 pandas 中没问题)。
我们运行将我们的代码作为一个包放在 Databricks 集群上(在这个问题中很难共享孤立的、有效的代码)。所有联接随后在一个函数中发生。
我们加入的主数据框不是特别大:44 列有 819.000 条记录。总共我们将 27 个其他数据框连接到这个主数据框,每个只向主数据框添加 1 到 3 个额外的列。
所有数据框都连接在同一列(客户唯一标识符)上,该列在主数据框中是唯一的,不应以任何方式倾斜。
当我们 运行 我们在数据块集群上的完整代码并尝试执行任何 action 时,问题就出现了 count() 或 display() 在 之后,所有连接都已 运行。 运行一旦必须执行所有后续连接,对主数据帧进行简单计数的时间就会激增,我们不明白这是如何发生的。
一些额外信息:
- 集群是一个简单的 DS4_v2(28GB 内存,8 个内核),具有 2-8 个工作节点。
- 将默认的 shuffle 分区编号更改为非常低的编号(如 2)似乎没有帮助
- 连接之间的持久化和非持久化数据帧似乎没有帮助
- 扩大或缩小集群似乎没有帮助
下面是执行的所有联接的代码。每个代码块都是一个单独函数的一部分。这些函数随后在我们在 Databricks 集群上调用的另一个函数中调用。 df_pop
是我们的主要数据框。
加入 1:
df_pop = df_pop.join(other_df1, df_pop.bc == other_df1.bc, how='left_outer')
加入 2 - 21(循环 20 个人口数据帧并将每个数据帧加入 df_pop
):
for pop in self.des_config.get('populations'):
population = self.cleaned_data.get(pop).withColumnRenamed('bc', f'bc_{pop}').select(col(f'bc_{pop}'))
df_pop = df_pop.join(population, df_pop.bc == f'bc_{pop}', how = 'left_outer')\
.withColumn(f'bc_{pop}', F.when(col(f'bc_{pop}').isNull(), F.lit(False)).otherwise(F.lit(True)))\
.withColumnRenamed(f'bc_{pop}', f'pop_{pop}')
加入 22 - 25:
df_pop = df_pop.withColumn(f'pop_Real Estate', df_pop.bo_sector == 'REAL ESTATE')\
.withColumn(f'pop_O&O', df_pop.bo_sector == 'GOVERNMENT & EDUCATION')\
.join(other_df2, on = 'bc', how = 'left_outer').drop('other_df2.bc')\
.join(other_df3, on = 'bc', how = 'left_outer').drop('other_df3.bc')\
.join(other_df4, on = 'bc', how = 'left_outer').drop('other_df4.bc')\
.join(other_df5, other_df5.bc == df_pop.bc, how = 'left_outer')
加入 26:
df_pop = df_pop.join(other_df6, other_df6.bc == df_pop.bc, how = 'Left_outer')\
.fillna(False, subset=['pop_edr_backlog'])
加入 27:
df_pop = df_pop.join(other_df7, other_df7.bc == df_pop.bc, how = 'left_outer')
在这些加入后无法对 df_pop
执行任何操作。知道如何解决这个问题吗?
我们刚刚解决了这个问题。由于在加入之前发生的 .withColumnRenamed,在 for 循环中执行的加入 2-21 显然导致了引擎盖下的 'broadcast nested loop join'。删除它导致 'broadcast hash joins' 被执行得更快。所以用 `df_pop.explain('formatted') 检查执行计划并检查正在执行的连接类型。