使用 Python 的 reduce() 加入多个 PySpark DataFrame
Using Python's reduce() to join multiple PySpark DataFrames
有谁知道为什么在加入多个 PySpark DataFrames 时使用 Python3 的 functools.reduce()
会导致比使用 for
循环迭代加入相同 DataFrames 更差的性能?具体来说,这会导致速度大幅下降,然后出现内存不足错误:
def join_dataframes(list_of_join_columns, left_df, right_df):
return left_df.join(right_df, on=list_of_join_columns)
joined_df = functools.reduce(
functools.partial(join_dataframes, list_of_join_columns), list_of_dataframes,
)
而这个没有:
joined_df = list_of_dataframes[0]
joined_df.cache()
for right_df in list_of_dataframes[1:]:
joined_df = joined_df.join(right_df, on=list_of_join_columns)
如有任何想法,我们将不胜感激。谢谢!
一个原因是 reduce 或 fold 通常在功能上是纯粹的:每个累加操作的结果不会写入内存的同一部分,而是写入新的内存块。
原则上垃圾收集器可以在每次累加后释放前一个块,但如果不这样做,您将为累加器的每个更新版本分配内存。
只要您使用 CPython(在这种特定情况下,不同的实现可以但实际上不应该表现出明显不同的行为)。如果您看一下 reduce
implementation,您会发现它只是一个具有最少异常处理的 for 循环。
内核与你使用的循环完全等价
for element in it:
value = function(value, element)
并且没有证据支持任何特殊行为的说法。
使用 Spark 连接的帧数实际限制的额外简单测试(连接 are among the most expensive operations in Spark)
dfs = [
spark.range(10000).selectExpr(
"rand({}) AS id".format(i), "id AS value", "{} AS loop ".format(i)
)
for i in range(200)
]
显示直接 for 循环之间的时间没有显着差异
def f(dfs):
df1 = dfs[0]
for df2 in dfs[1:]:
df1 = df1.join(df2, ["id"])
return df1
%timeit -n3 f(dfs)
## 6.25 s ± 257 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)
和reduce
调用
from functools import reduce
def g(dfs):
return reduce(lambda x, y: x.join(y, ["id"]), dfs)
%timeit -n3 g(dfs)
### 6.47 s ± 455 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)
同样,整体 JVM 行为模式在 for 循环之间具有可比性
For loop CPU and Memory Usage - VisualVM
和reduce
reduce CPU and Memory Usage - VisualVM
最终都生成相同的执行计划
g(dfs)._jdf.queryExecution().optimizedPlan().equals(
f(dfs)._jdf.queryExecution().optimizedPlan()
)
## True
这表明在评估计划时没有差异并且很可能发生 OOM。
换句话说,你的相关性并不意味着因果关系,观察到的性能问题不太可能与你用来组合的方法有关 DataFrames
。
有谁知道为什么在加入多个 PySpark DataFrames 时使用 Python3 的 functools.reduce()
会导致比使用 for
循环迭代加入相同 DataFrames 更差的性能?具体来说,这会导致速度大幅下降,然后出现内存不足错误:
def join_dataframes(list_of_join_columns, left_df, right_df):
return left_df.join(right_df, on=list_of_join_columns)
joined_df = functools.reduce(
functools.partial(join_dataframes, list_of_join_columns), list_of_dataframes,
)
而这个没有:
joined_df = list_of_dataframes[0]
joined_df.cache()
for right_df in list_of_dataframes[1:]:
joined_df = joined_df.join(right_df, on=list_of_join_columns)
如有任何想法,我们将不胜感激。谢谢!
一个原因是 reduce 或 fold 通常在功能上是纯粹的:每个累加操作的结果不会写入内存的同一部分,而是写入新的内存块。
原则上垃圾收集器可以在每次累加后释放前一个块,但如果不这样做,您将为累加器的每个更新版本分配内存。
只要您使用 CPython(在这种特定情况下,不同的实现可以但实际上不应该表现出明显不同的行为)。如果您看一下 reduce
implementation,您会发现它只是一个具有最少异常处理的 for 循环。
内核与你使用的循环完全等价
for element in it:
value = function(value, element)
并且没有证据支持任何特殊行为的说法。
使用 Spark 连接的帧数实际限制的额外简单测试(连接 are among the most expensive operations in Spark)
dfs = [
spark.range(10000).selectExpr(
"rand({}) AS id".format(i), "id AS value", "{} AS loop ".format(i)
)
for i in range(200)
]
显示直接 for 循环之间的时间没有显着差异
def f(dfs):
df1 = dfs[0]
for df2 in dfs[1:]:
df1 = df1.join(df2, ["id"])
return df1
%timeit -n3 f(dfs)
## 6.25 s ± 257 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)
和reduce
调用
from functools import reduce
def g(dfs):
return reduce(lambda x, y: x.join(y, ["id"]), dfs)
%timeit -n3 g(dfs)
### 6.47 s ± 455 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)
同样,整体 JVM 行为模式在 for 循环之间具有可比性
For loop CPU and Memory Usage - VisualVM
和reduce
reduce CPU and Memory Usage - VisualVM
最终都生成相同的执行计划
g(dfs)._jdf.queryExecution().optimizedPlan().equals(
f(dfs)._jdf.queryExecution().optimizedPlan()
)
## True
这表明在评估计划时没有差异并且很可能发生 OOM。
换句话说,你的相关性并不意味着因果关系,观察到的性能问题不太可能与你用来组合的方法有关 DataFrames
。