在Pyspark上模拟UDAF进行封装
Simulating UDAF on Pyspark for encapsulation
我正在使用 PySpark 学习 Spark,但在尝试使事情变得更干净时碰壁了。
假设有一个看起来像这样的数据框。 (当然,有更多的列和行)
A | B | C
--+---+------
a | 1 | 1.300
a | 2 | 2.500
a | 3 | 1.000
b | 1 | 120.0
b | 4 | 34.20
c | 2 | 3.442
我想在上面 运行 一堆 groupby -> agg
,使用基本的 pyspark.sql.functions
,比如 count()
和 mean()
,像这样:
df.groupby("A")\
.agg(mean("B").alias("B_mean"),
sum("C").alias("C_sum"),
(countDistinct("B")/avg("C")).alias("New_metric"))
它工作正常,运行速度相对较快,并且给了我想要的结果。
但是,最终,将需要稍微复杂一些的功能,而且,我们还想让这些功能更易于测试。
如何封装这些功能?使用 lambda
?绕过 UDF 的一些方法?
我知道 UDAF,并且可以在 SCALA 中编写它们并将代码导入 PySpark,但是,由于我们所有的代码库都已经在 Python 中,我想探索其他选项。
P.S.: 我们是运行ning Spark 1.6.0
函数可以定义为pyspark.sql.functions
:
的组合
是 - 走这边。例如:
def sum_of_squares(col):
return sum(col * col)
df.select(sum_of_squares(df["foo"]])
df.groupBy("foo").agg(sum_of_squares(df["bar"]])
否 - 使用 RDD。
我正在使用 PySpark 学习 Spark,但在尝试使事情变得更干净时碰壁了。
假设有一个看起来像这样的数据框。 (当然,有更多的列和行)
A | B | C
--+---+------
a | 1 | 1.300
a | 2 | 2.500
a | 3 | 1.000
b | 1 | 120.0
b | 4 | 34.20
c | 2 | 3.442
我想在上面 运行 一堆 groupby -> agg
,使用基本的 pyspark.sql.functions
,比如 count()
和 mean()
,像这样:
df.groupby("A")\
.agg(mean("B").alias("B_mean"),
sum("C").alias("C_sum"),
(countDistinct("B")/avg("C")).alias("New_metric"))
它工作正常,运行速度相对较快,并且给了我想要的结果。
但是,最终,将需要稍微复杂一些的功能,而且,我们还想让这些功能更易于测试。
如何封装这些功能?使用 lambda
?绕过 UDF 的一些方法?
我知道 UDAF,并且可以在 SCALA 中编写它们并将代码导入 PySpark,但是,由于我们所有的代码库都已经在 Python 中,我想探索其他选项。
P.S.: 我们是运行ning Spark 1.6.0
函数可以定义为pyspark.sql.functions
:
是 - 走这边。例如:
def sum_of_squares(col): return sum(col * col) df.select(sum_of_squares(df["foo"]]) df.groupBy("foo").agg(sum_of_squares(df["bar"]])
否 - 使用 RDD。