要应用于 PySpark 中的 Window 的用户定义函数?
User defined function to be applied to Window in PySpark?
我正在尝试将用户定义的函数应用于 PySpark 中的 Window。我读过 UDAF 可能是要走的路,但我找不到任何具体的东西。
举个例子(取自这里:Xinh's Tech Blog 并针对 PySpark 进行了修改):
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import avg
spark = SparkSession.builder.master("local").config(conf=SparkConf()).getOrCreate()
a = spark.createDataFrame([[1, "a"], [2, "b"], [3, "c"], [4, "d"], [5, "e"]], ['ind', "state"])
customers = spark.createDataFrame([["Alice", "2016-05-01", 50.00],
["Alice", "2016-05-03", 45.00],
["Alice", "2016-05-04", 55.00],
["Bob", "2016-05-01", 25.00],
["Bob", "2016-05-04", 29.00],
["Bob", "2016-05-06", 27.00]],
["name", "date", "amountSpent"])
customers.show()
window_spec = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)
result = customers.withColumn( "movingAvg", avg(customers["amountSpent"]).over(window_spec))
result.show()
我正在应用 avg
,它已经内置于 pyspark.sql.functions
,但是如果我想使用更复杂的东西而不是 avg
并编写我自己的函数,我将如何这样做吗?
Spark >= 3.0:
SPARK-24561 - User-defined window functions with pandas udf (bounded window) 正在进行中.详情请关注相关JIRA
Spark >= 2.4:
SPARK-22239 - 用户定义的 window 函数 pandas udf(无界 window) 引入了对 [=基于 52=] 的 window 函数具有无限 windows。一般结构是
return_type: DataType
@pandas_udf(return_type, PandasUDFType.GROUPED_AGG)
def f(v):
return ...
w = (Window
.partitionBy(grouping_column)
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
df.withColumn('foo', f('bar').over(w))
请参阅 doctests and the unit tests 了解详细示例。
Spark < 2.4
你不能。 Window 函数需要 UserDefinedAggregateFunction
或等效对象,而不是 UserDefinedFunction
,并且不可能在 PySpark 中定义一个。
但是,在 PySpark 2.3 或更高版本中,您可以定义矢量化 pandas_udf
,它可以应用于分组数据。您可以找到一个工作示例 Applying UDFs on GroupedData in PySpark (with functioning python example). While Pandas don't provide direct equivalent of window functions, there are expressive enough to implement any window-like logic, especially with pandas.DataFrame.rolling
。此外,与 GroupedData.apply
一起使用的函数可以 return 任意行数。
您还可以从 PySpark 调用 Scala UDAF Spark: How to map Python with Scala or Java User Defined Functions?。
自 Spark 3.0.0 起,UDF 现在可以应用于 Window。
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html
文档摘录:
from pyspark.sql import Window
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
w = Window.partitionBy('id').orderBy('v').rowsBetween(-1, 0)
df.withColumn('mean_v', mean_udf("v").over(w)).show()
+---+----+------+
| id| v|mean_v|
+---+----+------+
| 1| 1.0| 1.0|
| 1| 2.0| 1.5|
| 2| 3.0| 3.0|
| 2| 5.0| 4.0|
| 2|10.0| 7.5|
+---+----+------+
我正在尝试将用户定义的函数应用于 PySpark 中的 Window。我读过 UDAF 可能是要走的路,但我找不到任何具体的东西。
举个例子(取自这里:Xinh's Tech Blog 并针对 PySpark 进行了修改):
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import avg
spark = SparkSession.builder.master("local").config(conf=SparkConf()).getOrCreate()
a = spark.createDataFrame([[1, "a"], [2, "b"], [3, "c"], [4, "d"], [5, "e"]], ['ind', "state"])
customers = spark.createDataFrame([["Alice", "2016-05-01", 50.00],
["Alice", "2016-05-03", 45.00],
["Alice", "2016-05-04", 55.00],
["Bob", "2016-05-01", 25.00],
["Bob", "2016-05-04", 29.00],
["Bob", "2016-05-06", 27.00]],
["name", "date", "amountSpent"])
customers.show()
window_spec = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)
result = customers.withColumn( "movingAvg", avg(customers["amountSpent"]).over(window_spec))
result.show()
我正在应用 avg
,它已经内置于 pyspark.sql.functions
,但是如果我想使用更复杂的东西而不是 avg
并编写我自己的函数,我将如何这样做吗?
Spark >= 3.0:
SPARK-24561 - User-defined window functions with pandas udf (bounded window) 正在进行中.详情请关注相关JIRA
Spark >= 2.4:
SPARK-22239 - 用户定义的 window 函数 pandas udf(无界 window) 引入了对 [=基于 52=] 的 window 函数具有无限 windows。一般结构是
return_type: DataType
@pandas_udf(return_type, PandasUDFType.GROUPED_AGG)
def f(v):
return ...
w = (Window
.partitionBy(grouping_column)
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
df.withColumn('foo', f('bar').over(w))
请参阅 doctests and the unit tests 了解详细示例。
Spark < 2.4
你不能。 Window 函数需要 UserDefinedAggregateFunction
或等效对象,而不是 UserDefinedFunction
,并且不可能在 PySpark 中定义一个。
但是,在 PySpark 2.3 或更高版本中,您可以定义矢量化 pandas_udf
,它可以应用于分组数据。您可以找到一个工作示例 Applying UDFs on GroupedData in PySpark (with functioning python example). While Pandas don't provide direct equivalent of window functions, there are expressive enough to implement any window-like logic, especially with pandas.DataFrame.rolling
。此外,与 GroupedData.apply
一起使用的函数可以 return 任意行数。
您还可以从 PySpark 调用 Scala UDAF Spark: How to map Python with Scala or Java User Defined Functions?。
自 Spark 3.0.0 起,UDF 现在可以应用于 Window。
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html
文档摘录:
from pyspark.sql import Window
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
w = Window.partitionBy('id').orderBy('v').rowsBetween(-1, 0)
df.withColumn('mean_v', mean_udf("v").over(w)).show()
+---+----+------+
| id| v|mean_v|
+---+----+------+
| 1| 1.0| 1.0|
| 1| 2.0| 1.5|
| 2| 3.0| 3.0|
| 2| 5.0| 4.0|
| 2|10.0| 7.5|
+---+----+------+