在Pyspark上模拟UDAF进行封装

Simulating UDAF on Pyspark for encapsulation

我正在使用 PySpark 学习 Spark,但在尝试使事情变得更干净时碰壁了。

假设有一个看起来像这样的数据框。 (当然,有更多的列和行)

A | B |   C
--+---+------
a | 1 | 1.300
a | 2 | 2.500
a | 3 | 1.000
b | 1 | 120.0
b | 4 | 34.20
c | 2 | 3.442

我想在上面 运行 一堆 groupby -> agg,使用基本的 pyspark.sql.functions ,比如 count()mean(),像这样:

df.groupby("A")\
    .agg(mean("B").alias("B_mean"),
         sum("C").alias("C_sum"),
         (countDistinct("B")/avg("C")).alias("New_metric"))

它工作正常,运行速度相对较快,并且给了我想要的结果。

但是,最终,将需要稍微复杂一些的功能,而且,我们还想让这些功能更易于测试。

如何封装这些功能?使用 lambda?绕过 UDF 的一些方法?

我知道 UDAF,并且可以在 SCALA 中编写它们并将代码导入 PySpark,但是,由于我们所有的代码库都已经在 Python 中,我想探索其他选项。

P.S.: 我们是运行ning Spark 1.6.0

函数可以定义为pyspark.sql.functions:

的组合
  • 是 - 走这边。例如:

    def sum_of_squares(col):
        return sum(col * col)
    
    df.select(sum_of_squares(df["foo"]])
    
    df.groupBy("foo").agg(sum_of_squares(df["bar"]])
    
  • 否 - 使用 RDD。