多个后续连接的 Spark 性能问题

Spark performance issues with multiple subsequent joins

我们正在将大量本地 运行ning Python ETL 代码(使用 pandas)迁移到 Databricks 上的 Spark 运行ning。我们正在 运行 解决执行许多后续连接的部分中的一些性能问题(运行 在 pandas 中没问题)。

我们运行将我们的代码作为一个包放在 Databricks 集群上(在这个问题中很难共享孤立的、有效的代码)。所有联接随后在一个函数中发生。

我们加入的主数据框不是特别大:44 列有 819.000 条记录。总共我们将 27 个其他数据框连接到这个主数据框,每个只向主数据框添加 1 到 3 个额外的列。

所有数据框都连接在同一列(客户唯一标识符)上,该列在主数据框中是唯一的,不应以任何方式倾斜。

当我们 运行 我们在数据块集群上的完整代码并尝试执行任何 action 时,问题就出现了 count()display() 之后,所有连接都已 运行。 运行一旦必须执行所有后续连接,对主数据帧进行简单计数的时间就会激增,我们不明白这是如何发生的。

一些额外信息:

下面是执行的所有联接的代码。每个代码块都是一个单独函数的一部分。这些函数随后在我们在 Databricks 集群上调用的另一个函数中调用。 df_pop 是我们的主要数据框。

加入 1:

df_pop = df_pop.join(other_df1, df_pop.bc == other_df1.bc, how='left_outer')

加入 2 - 21(循环 20 个人口数据帧并将每个数据帧加入 df_pop):

for pop in self.des_config.get('populations'):
    population = self.cleaned_data.get(pop).withColumnRenamed('bc', f'bc_{pop}').select(col(f'bc_{pop}'))
    df_pop = df_pop.join(population, df_pop.bc == f'bc_{pop}', how = 'left_outer')\
                   .withColumn(f'bc_{pop}', F.when(col(f'bc_{pop}').isNull(), F.lit(False)).otherwise(F.lit(True)))\
                   .withColumnRenamed(f'bc_{pop}', f'pop_{pop}')

加入 22 - 25:

df_pop = df_pop.withColumn(f'pop_Real Estate', df_pop.bo_sector == 'REAL ESTATE')\
                       .withColumn(f'pop_O&O', df_pop.bo_sector == 'GOVERNMENT & EDUCATION')\
                       .join(other_df2, on = 'bc', how = 'left_outer').drop('other_df2.bc')\
                       .join(other_df3, on = 'bc', how = 'left_outer').drop('other_df3.bc')\
                       .join(other_df4, on = 'bc', how = 'left_outer').drop('other_df4.bc')\
                       .join(other_df5, other_df5.bc == df_pop.bc, how = 'left_outer')

加入 26:

df_pop = df_pop.join(other_df6, other_df6.bc == df_pop.bc, how = 'Left_outer')\
                        .fillna(False, subset=['pop_edr_backlog'])

加入 27:

df_pop = df_pop.join(other_df7, other_df7.bc == df_pop.bc, how = 'left_outer')

在这些加入后无法对 df_pop 执行任何操作。知道如何解决这个问题吗?

我们刚刚解决了这个问题。由于在加入之前发生的 .withColumnRenamed,在 for 循环中执行的加入 2-21 显然导致了引擎盖下的 'broadcast nested loop join'。删除它导致 'broadcast hash joins' 被执行得更快。所以用 `df_pop.explain('formatted') 检查执行计划并检查正在执行的连接类型。