pyspark approxQuantile 函数
pyspark approxQuantile function
我有包含这些列的数据框 id
、price
、timestamp
。
我想找到按 id
分组的中值。
我正在使用此代码查找它,但它给了我这个错误。
from pyspark.sql import DataFrameStatFunctions as statFunc
windowSpec = Window.partitionBy("id")
median = statFunc.approxQuantile("price",
[0.5],
0) \
.over(windowSpec)
return df.withColumn("Median", median)
是否无法使用 DataFrameStatFunctions
在新列中填充值?
TypeError: unbound method approxQuantile() must be called with DataFrameStatFunctions instance as first argument (got str instance instead)
嗯,确实不可能可以使用approxQuantile
来填充新数据框列中的值,但这不是您收到此错误的原因。不幸的是,整个背后的故事是一个相当令人沮丧的故事,因为 I have argued 许多 Spark(尤其是 PySpark)功能就是这种情况,而且它们缺乏足够的文档。
首先,没有一种,而是两种 approxQuantile
种方法; first one 是标准 DataFrame class 的一部分,即您不需要导入 DataFrameStatFunctions:
spark.version
# u'2.1.1'
sampleData = [("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)]
df = spark.createDataFrame(sampleData, schema=["Name","Role","Salary"])
df.show()
# +------+---------+------+
# | Name| Role|Salary|
# +------+---------+------+
# | bob|Developer|125000|
# | mark|Developer|108000|
# | carl| Tester| 70000|
# | peter|Developer|185000|
# | jon| Tester| 65000|
# | roman| Tester| 82000|
# | simon|Developer| 98000|
# | eric|Developer|144000|
# |carlos| Tester| 75000|
# | henry|Developer|110000|
# +------+---------+------+
med = df.approxQuantile("Salary", [0.5], 0.25) # no need to import DataFrameStatFunctions
med
# [98000.0]
The second one是DataFrameStatFunctions
的一部分,但是如果你照着用,就会报错:
from pyspark.sql import DataFrameStatFunctions as statFunc
med2 = statFunc.approxQuantile( "Salary", [0.5], 0.25)
# TypeError: unbound method approxQuantile() must be called with DataFrameStatFunctions instance as first argument (got str instance instead)
因为正确的用法是
med2 = statFunc(df).approxQuantile( "Salary", [0.5], 0.25)
med2
# [82000.0]
虽然您无法在 PySpark 文档中找到关于此的简单示例(我花了一些时间自己弄清楚)...最好的部分?两个值是不等于:
med == med2
# False
我怀疑这是由于使用了非确定性算法(毕竟,它应该是一个近似值中值),即使你重新运行 具有相同玩具数据的命令你可能会得到不同的值(并且与我在这里报告的不同) - 我建议进行一些实验以获得感觉......
但是,正如我已经说过的,这不是您不能使用 approxQuantile
在新数据框列中填充值的原因 - 即使您使用正确的语法,您也会得到不同的错误:
df2 = df.withColumn('median_salary', statFunc(df).approxQuantile( "Salary", [0.5], 0.25))
# AssertionError: col should be Column
这里,col
指的是withColumn
操作的第二个参数,即approxQuantile
,错误信息说它不是Column
类型 - 事实上,它是一个列表:
type(statFunc(df).approxQuantile( "Salary", [0.5], 0.25))
# list
因此,在填充列值时,Spark 需要 Column
类型的参数,而您不能使用列表;下面是创建一个新列的示例,每个角色的平均值而不是中值:
import pyspark.sql.functions as func
from pyspark.sql import Window
windowSpec = Window.partitionBy(df['Role'])
df2 = df.withColumn('mean_salary', func.mean(df['Salary']).over(windowSpec))
df2.show()
# +------+---------+------+------------------+
# | Name| Role|Salary| mean_salary|
# +------+---------+------+------------------+
# | carl| Tester| 70000| 73000.0|
# | jon| Tester| 65000| 73000.0|
# | roman| Tester| 82000| 73000.0|
# |carlos| Tester| 75000| 73000.0|
# | bob|Developer|125000|128333.33333333333|
# | mark|Developer|108000|128333.33333333333|
# | peter|Developer|185000|128333.33333333333|
# | simon|Developer| 98000|128333.33333333333|
# | eric|Developer|144000|128333.33333333333|
# | henry|Developer|110000|128333.33333333333|
# +------+---------+------+------------------+
之所以有效,是因为与 approxQuantile
相反,mean
returns a Column
:
type(func.mean(df['Salary']).over(windowSpec))
# pyspark.sql.column.Column
计算组中的分位数(汇总)示例
由于组缺少聚合函数,我添加了一个按名称构造函数调用的示例(percentile_approx
本例):
from pyspark.sql.column import Column, _to_java_column, _to_seq
def from_name(sc, func_name, *params):
"""
create call by function name
"""
callUDF = sc._jvm.org.apache.spark.sql.functions.callUDF
func = callUDF(func_name, _to_seq(sc, *params, _to_java_column))
return Column(func)
在groupBy中应用percentile_approx
函数:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
# build percentile_approx function call by name:
target = from_name(sc, "percentile_approx", [f.col("salary"), f.lit(0.95)])
# load dataframe for persons data
# with columns "person_id", "group_id" and "salary"
persons = spark.read.parquet( ... )
# apply function for each group
persons.groupBy("group_id").agg(
target.alias("target")).show()
如果您可以使用聚合而不是 window 函数,也可以选择使用 pandas_udf。尽管它们不如纯 Spark 快。这是来自 docs:
的改编示例
from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "price")
)
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def median_udf(v):
return v.median()
df.groupby("id").agg(median_udf(df["price"])).show()
从 PySpark 3.1.0 开始,引入了 percentile_approx
函数来解决这个问题。
函数percentile_approx
returns一个列表,因此需要对第一个元素进行切片。
如:
windowSpec = Window.partitionBy("id")
df.withColumn("Median", F.percentile_approx("price", [0.5]).over(windowSpec)[0])
我有包含这些列的数据框 id
、price
、timestamp
。
我想找到按 id
分组的中值。
我正在使用此代码查找它,但它给了我这个错误。
from pyspark.sql import DataFrameStatFunctions as statFunc
windowSpec = Window.partitionBy("id")
median = statFunc.approxQuantile("price",
[0.5],
0) \
.over(windowSpec)
return df.withColumn("Median", median)
是否无法使用 DataFrameStatFunctions
在新列中填充值?
TypeError: unbound method approxQuantile() must be called with DataFrameStatFunctions instance as first argument (got str instance instead)
嗯,确实不可能可以使用approxQuantile
来填充新数据框列中的值,但这不是您收到此错误的原因。不幸的是,整个背后的故事是一个相当令人沮丧的故事,因为 I have argued 许多 Spark(尤其是 PySpark)功能就是这种情况,而且它们缺乏足够的文档。
首先,没有一种,而是两种 approxQuantile
种方法; first one 是标准 DataFrame class 的一部分,即您不需要导入 DataFrameStatFunctions:
spark.version
# u'2.1.1'
sampleData = [("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)]
df = spark.createDataFrame(sampleData, schema=["Name","Role","Salary"])
df.show()
# +------+---------+------+
# | Name| Role|Salary|
# +------+---------+------+
# | bob|Developer|125000|
# | mark|Developer|108000|
# | carl| Tester| 70000|
# | peter|Developer|185000|
# | jon| Tester| 65000|
# | roman| Tester| 82000|
# | simon|Developer| 98000|
# | eric|Developer|144000|
# |carlos| Tester| 75000|
# | henry|Developer|110000|
# +------+---------+------+
med = df.approxQuantile("Salary", [0.5], 0.25) # no need to import DataFrameStatFunctions
med
# [98000.0]
The second one是DataFrameStatFunctions
的一部分,但是如果你照着用,就会报错:
from pyspark.sql import DataFrameStatFunctions as statFunc
med2 = statFunc.approxQuantile( "Salary", [0.5], 0.25)
# TypeError: unbound method approxQuantile() must be called with DataFrameStatFunctions instance as first argument (got str instance instead)
因为正确的用法是
med2 = statFunc(df).approxQuantile( "Salary", [0.5], 0.25)
med2
# [82000.0]
虽然您无法在 PySpark 文档中找到关于此的简单示例(我花了一些时间自己弄清楚)...最好的部分?两个值是不等于:
med == med2
# False
我怀疑这是由于使用了非确定性算法(毕竟,它应该是一个近似值中值),即使你重新运行 具有相同玩具数据的命令你可能会得到不同的值(并且与我在这里报告的不同) - 我建议进行一些实验以获得感觉......
但是,正如我已经说过的,这不是您不能使用 approxQuantile
在新数据框列中填充值的原因 - 即使您使用正确的语法,您也会得到不同的错误:
df2 = df.withColumn('median_salary', statFunc(df).approxQuantile( "Salary", [0.5], 0.25))
# AssertionError: col should be Column
这里,col
指的是withColumn
操作的第二个参数,即approxQuantile
,错误信息说它不是Column
类型 - 事实上,它是一个列表:
type(statFunc(df).approxQuantile( "Salary", [0.5], 0.25))
# list
因此,在填充列值时,Spark 需要 Column
类型的参数,而您不能使用列表;下面是创建一个新列的示例,每个角色的平均值而不是中值:
import pyspark.sql.functions as func
from pyspark.sql import Window
windowSpec = Window.partitionBy(df['Role'])
df2 = df.withColumn('mean_salary', func.mean(df['Salary']).over(windowSpec))
df2.show()
# +------+---------+------+------------------+
# | Name| Role|Salary| mean_salary|
# +------+---------+------+------------------+
# | carl| Tester| 70000| 73000.0|
# | jon| Tester| 65000| 73000.0|
# | roman| Tester| 82000| 73000.0|
# |carlos| Tester| 75000| 73000.0|
# | bob|Developer|125000|128333.33333333333|
# | mark|Developer|108000|128333.33333333333|
# | peter|Developer|185000|128333.33333333333|
# | simon|Developer| 98000|128333.33333333333|
# | eric|Developer|144000|128333.33333333333|
# | henry|Developer|110000|128333.33333333333|
# +------+---------+------+------------------+
之所以有效,是因为与 approxQuantile
相反,mean
returns a Column
:
type(func.mean(df['Salary']).over(windowSpec))
# pyspark.sql.column.Column
计算组中的分位数(汇总)示例
由于组缺少聚合函数,我添加了一个按名称构造函数调用的示例(percentile_approx
本例):
from pyspark.sql.column import Column, _to_java_column, _to_seq
def from_name(sc, func_name, *params):
"""
create call by function name
"""
callUDF = sc._jvm.org.apache.spark.sql.functions.callUDF
func = callUDF(func_name, _to_seq(sc, *params, _to_java_column))
return Column(func)
在groupBy中应用percentile_approx
函数:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
# build percentile_approx function call by name:
target = from_name(sc, "percentile_approx", [f.col("salary"), f.lit(0.95)])
# load dataframe for persons data
# with columns "person_id", "group_id" and "salary"
persons = spark.read.parquet( ... )
# apply function for each group
persons.groupBy("group_id").agg(
target.alias("target")).show()
如果您可以使用聚合而不是 window 函数,也可以选择使用 pandas_udf。尽管它们不如纯 Spark 快。这是来自 docs:
的改编示例from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "price")
)
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def median_udf(v):
return v.median()
df.groupby("id").agg(median_udf(df["price"])).show()
从 PySpark 3.1.0 开始,引入了 percentile_approx
函数来解决这个问题。
函数percentile_approx
returns一个列表,因此需要对第一个元素进行切片。
如:
windowSpec = Window.partitionBy("id")
df.withColumn("Median", F.percentile_approx("price", [0.5]).over(windowSpec)[0])