PySpark DataFrame 上分组数据的 Pandas 式转换
Pandas-style transform of grouped data on PySpark DataFrame
如果我们有一个包含一列类别和一列值的 Pandas 数据框,我们可以通过执行以下操作删除每个类别中的均值:
df["DemeanedValues"] = df.groupby("Category")["Values"].transform(lambda g: g - numpy.mean(g))
据我了解,Spark 数据帧不直接提供此 group-by/transform 操作(我在 Spark 1.5.0 上使用 PySpark)。那么,实现这种计算的最佳方式是什么?
我试过使用 group-by/join 如下:
df2 = df.groupBy("Category").mean("Values")
df3 = df2.join(df)
但是速度非常慢,因为据我了解,每个类别都需要对 DataFrame 进行全面扫描。
我认为(但尚未证实)如果我将 group-by/mean 的结果收集到字典中,然后在 UDF 中使用该字典,我可以大大加快速度,如下所示:
nameToMean = {...}
f = lambda category, value: value - nameToMean[category]
categoryDemeaned = pyspark.sql.functions.udf(f, pyspark.sql.types.DoubleType())
df = df.withColumn("DemeanedValue", categoryDemeaned(df.Category, df.Value))
是否有一种惯用的方式来表达这种类型的操作而不牺牲性能?
I understand, each category requires a full scan of the DataFrame.
不,不是。 DataFrame 聚合是使用类似于 aggregateByKey
的逻辑执行的。请参阅 较慢的部分是 join
,它需要排序/改组。但它仍然不需要按组扫描。
如果这是一个确切的代码,您使用它会很慢,因为您没有提供连接表达式。因此,它只是执行笛卡尔积。所以它不仅低效而且不正确。你想要这样的东西:
from pyspark.sql.functions import col
means = df.groupBy("Category").mean("Values").alias("means")
df.alias("df").join(means, col("df.Category") == col("means.Category"))
I think (but have not verified) that I can speed this up a great deal if I collect the result of the group-by/mean into a dictionary, and then use that dictionary in a UDF
虽然性能会因具体情况而异,但这是可能的。使用 Python UDF 的一个问题是它必须将数据移入和移出 Python。不过,这绝对值得一试。不过,您应该考虑为 nameToMean
使用广播变量。
Is there an idiomatic way to express this type of operation without sacrificing performance?
在 PySpark 1.6 中,您可以使用 broadcast
函数:
df.alias("df").join(
broadcast(means), col("df.Category") == col("means.Category"))
但它在 <= 1.5 中不可用。
实际上,在 Spark 中有一种惯用的方法可以使用 Hive OVER
表达式。
即
df.registerTempTable('df')
with_category_means = sqlContext.sql('select *, mean(Values) OVER (PARTITION BY Category) as category_mean from df')
在幕后,这是使用 window 函数。不过,我不确定这是否比您的解决方案更快
您可以使用 Window
来做到这一点
即
import pyspark.sql.functions as F
from pyspark.sql.window import Window
window_var = Window().partitionBy('Categroy')
df = df.withColumn('DemeanedValues', F.col('Values') - F.mean('Values').over(window_var))
如果我们有一个包含一列类别和一列值的 Pandas 数据框,我们可以通过执行以下操作删除每个类别中的均值:
df["DemeanedValues"] = df.groupby("Category")["Values"].transform(lambda g: g - numpy.mean(g))
据我了解,Spark 数据帧不直接提供此 group-by/transform 操作(我在 Spark 1.5.0 上使用 PySpark)。那么,实现这种计算的最佳方式是什么?
我试过使用 group-by/join 如下:
df2 = df.groupBy("Category").mean("Values")
df3 = df2.join(df)
但是速度非常慢,因为据我了解,每个类别都需要对 DataFrame 进行全面扫描。
我认为(但尚未证实)如果我将 group-by/mean 的结果收集到字典中,然后在 UDF 中使用该字典,我可以大大加快速度,如下所示:
nameToMean = {...}
f = lambda category, value: value - nameToMean[category]
categoryDemeaned = pyspark.sql.functions.udf(f, pyspark.sql.types.DoubleType())
df = df.withColumn("DemeanedValue", categoryDemeaned(df.Category, df.Value))
是否有一种惯用的方式来表达这种类型的操作而不牺牲性能?
I understand, each category requires a full scan of the DataFrame.
不,不是。 DataFrame 聚合是使用类似于 aggregateByKey
的逻辑执行的。请参阅 join
,它需要排序/改组。但它仍然不需要按组扫描。
如果这是一个确切的代码,您使用它会很慢,因为您没有提供连接表达式。因此,它只是执行笛卡尔积。所以它不仅低效而且不正确。你想要这样的东西:
from pyspark.sql.functions import col
means = df.groupBy("Category").mean("Values").alias("means")
df.alias("df").join(means, col("df.Category") == col("means.Category"))
I think (but have not verified) that I can speed this up a great deal if I collect the result of the group-by/mean into a dictionary, and then use that dictionary in a UDF
虽然性能会因具体情况而异,但这是可能的。使用 Python UDF 的一个问题是它必须将数据移入和移出 Python。不过,这绝对值得一试。不过,您应该考虑为 nameToMean
使用广播变量。
Is there an idiomatic way to express this type of operation without sacrificing performance?
在 PySpark 1.6 中,您可以使用 broadcast
函数:
df.alias("df").join(
broadcast(means), col("df.Category") == col("means.Category"))
但它在 <= 1.5 中不可用。
实际上,在 Spark 中有一种惯用的方法可以使用 Hive OVER
表达式。
即
df.registerTempTable('df')
with_category_means = sqlContext.sql('select *, mean(Values) OVER (PARTITION BY Category) as category_mean from df')
在幕后,这是使用 window 函数。不过,我不确定这是否比您的解决方案更快
您可以使用 Window
来做到这一点
即
import pyspark.sql.functions as F
from pyspark.sql.window import Window
window_var = Window().partitionBy('Categroy')
df = df.withColumn('DemeanedValues', F.col('Values') - F.mean('Values').over(window_var))