PySpark groupBy 中的中位数/分位数
Median / quantiles within PySpark groupBy
我想计算 Spark 数据帧上的组分位数(使用 PySpark)。近似或精确的结果都可以。我更喜欢可以在 groupBy
/ agg
上下文中使用的解决方案,这样我就可以将它与其他 PySpark 聚合函数混合使用。如果由于某种原因无法做到这一点,也可以使用其他方法。
相关但未说明如何将 approxQuantile
用作聚合函数。
我也可以访问 percentile_approx
Hive UDF,但我不知道如何将它用作聚合函数。
为了具体起见,假设我有以下数据框:
from pyspark import SparkContext
import pyspark.sql.functions as f
sc = SparkContext()
df = sc.parallelize([
['A', 1],
['A', 2],
['A', 3],
['B', 4],
['B', 5],
['B', 6],
]).toDF(('grp', 'val'))
df_grp = df.groupBy('grp').agg(f.magic_percentile('val', 0.5).alias('med_val'))
df_grp.show()
预期结果是:
+----+-------+
| grp|med_val|
+----+-------+
| A| 2|
| B| 5|
+----+-------+
由于您可以访问 percentile_approx
,一个简单的解决方案是在 SQL
命令中使用它:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df.registerTempTable("df")
df2 = sqlContext.sql("select grp, percentile_approx(val, 0.5) as med_val from df group by grp")
(更新:现在 是 可能的,请参阅上面接受的答案)
不幸的是,据我所知,似乎无法使用“纯”PySpark 命令执行此操作(Shaido 的解决方案提供了 SQL 的解决方法),以及原因非常初级:与其他聚合函数相比,例如 mean
,approxQuantile
不是 return 一个 Column
类型,而是一个 list.
让我们看一个使用您的示例数据的简单示例:
spark.version
# u'2.2.0'
import pyspark.sql.functions as func
from pyspark.sql import DataFrameStatFunctions as statFunc
# aggregate with mean works OK:
df_grp_mean = df.groupBy('grp').agg(func.mean(df['val']).alias('mean_val'))
df_grp_mean.show()
# +---+--------+
# |grp|mean_val|
# +---+--------+
# | B| 5.0|
# | A| 2.0|
# +---+--------+
# try aggregating by median:
df_grp_med = df.groupBy('grp').agg(statFunc(df).approxQuantile('val', [0.5], 0.1))
# AssertionError: all exprs should be Column
# mean aggregation is a Column, but median is a list:
type(func.mean(df['val']))
# pyspark.sql.column.Column
type(statFunc(df).approxQuantile('val', [0.5], 0.1))
# list
我怀疑基于 window 的方法是否会有任何不同,因为正如我所说,根本原因是非常基本的。
另请参阅 了解更多详细信息。
我猜你不再需要它了。但会留在这里留给子孙后代(也就是我下周忘记的时候)。
from pyspark.sql import Window
import pyspark.sql.functions as F
grp_window = Window.partitionBy('grp')
magic_percentile = F.expr('percentile_approx(val, 0.5)')
df.withColumn('med_val', magic_percentile.over(grp_window))
或者为了准确解决您的问题,这也有效:
df.groupBy('grp').agg(magic_percentile.alias('med_val'))
作为奖励,您可以传递一组百分位数:
quantiles = F.expr('percentile_approx(val, array(0.25, 0.5, 0.75))')
你会在 return 中得到一个列表。
使用 pyspark==2.4.5
执行此操作的最简单方法是:
df \
.groupby('grp') \
.agg(expr('percentile(val, array(0.5))')[0].alias('50%')) \
.show()
输出:
|grp|50%|
+---+---+
| B|5.0|
| A|2.0|
+---+---+
“percentile_approx(val, 0.5)”的问题:
如果例如范围是 [1,2,3,4] 这个函数 returns 2(作为中值)下面的函数 returns 2.5:
import statistics
median_udf = F.udf(lambda x: statistics.median(x) if bool(x) else None, DoubleType())
... .groupBy('something').agg(median_udf(F.collect_list(F.col('value'))).alias('median'))
好像pyspark >= 3.1.0
用percentile_approx
完全解决了
import pyspark.sql.functions as func
df.groupBy("grp").agg(func.percentile_approx("val", 0.5).alias("median"))
我想计算 Spark 数据帧上的组分位数(使用 PySpark)。近似或精确的结果都可以。我更喜欢可以在 groupBy
/ agg
上下文中使用的解决方案,这样我就可以将它与其他 PySpark 聚合函数混合使用。如果由于某种原因无法做到这一点,也可以使用其他方法。
approxQuantile
用作聚合函数。
我也可以访问 percentile_approx
Hive UDF,但我不知道如何将它用作聚合函数。
为了具体起见,假设我有以下数据框:
from pyspark import SparkContext
import pyspark.sql.functions as f
sc = SparkContext()
df = sc.parallelize([
['A', 1],
['A', 2],
['A', 3],
['B', 4],
['B', 5],
['B', 6],
]).toDF(('grp', 'val'))
df_grp = df.groupBy('grp').agg(f.magic_percentile('val', 0.5).alias('med_val'))
df_grp.show()
预期结果是:
+----+-------+
| grp|med_val|
+----+-------+
| A| 2|
| B| 5|
+----+-------+
由于您可以访问 percentile_approx
,一个简单的解决方案是在 SQL
命令中使用它:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df.registerTempTable("df")
df2 = sqlContext.sql("select grp, percentile_approx(val, 0.5) as med_val from df group by grp")
(更新:现在 是 可能的,请参阅上面接受的答案)
不幸的是,据我所知,似乎无法使用“纯”PySpark 命令执行此操作(Shaido 的解决方案提供了 SQL 的解决方法),以及原因非常初级:与其他聚合函数相比,例如 mean
,approxQuantile
不是 return 一个 Column
类型,而是一个 list.
让我们看一个使用您的示例数据的简单示例:
spark.version
# u'2.2.0'
import pyspark.sql.functions as func
from pyspark.sql import DataFrameStatFunctions as statFunc
# aggregate with mean works OK:
df_grp_mean = df.groupBy('grp').agg(func.mean(df['val']).alias('mean_val'))
df_grp_mean.show()
# +---+--------+
# |grp|mean_val|
# +---+--------+
# | B| 5.0|
# | A| 2.0|
# +---+--------+
# try aggregating by median:
df_grp_med = df.groupBy('grp').agg(statFunc(df).approxQuantile('val', [0.5], 0.1))
# AssertionError: all exprs should be Column
# mean aggregation is a Column, but median is a list:
type(func.mean(df['val']))
# pyspark.sql.column.Column
type(statFunc(df).approxQuantile('val', [0.5], 0.1))
# list
我怀疑基于 window 的方法是否会有任何不同,因为正如我所说,根本原因是非常基本的。
另请参阅
我猜你不再需要它了。但会留在这里留给子孙后代(也就是我下周忘记的时候)。
from pyspark.sql import Window
import pyspark.sql.functions as F
grp_window = Window.partitionBy('grp')
magic_percentile = F.expr('percentile_approx(val, 0.5)')
df.withColumn('med_val', magic_percentile.over(grp_window))
或者为了准确解决您的问题,这也有效:
df.groupBy('grp').agg(magic_percentile.alias('med_val'))
作为奖励,您可以传递一组百分位数:
quantiles = F.expr('percentile_approx(val, array(0.25, 0.5, 0.75))')
你会在 return 中得到一个列表。
使用 pyspark==2.4.5
执行此操作的最简单方法是:
df \
.groupby('grp') \
.agg(expr('percentile(val, array(0.5))')[0].alias('50%')) \
.show()
输出:
|grp|50%|
+---+---+
| B|5.0|
| A|2.0|
+---+---+
“percentile_approx(val, 0.5)”的问题: 如果例如范围是 [1,2,3,4] 这个函数 returns 2(作为中值)下面的函数 returns 2.5:
import statistics
median_udf = F.udf(lambda x: statistics.median(x) if bool(x) else None, DoubleType())
... .groupBy('something').agg(median_udf(F.collect_list(F.col('value'))).alias('median'))
好像pyspark >= 3.1.0
用percentile_approx
import pyspark.sql.functions as func
df.groupBy("grp").agg(func.percentile_approx("val", 0.5).alias("median"))