如何使用 DataFrames 在 PySpark 中使用 window 函数?

How to use window functions in PySpark using DataFrames?

试图找出如何在 PySpark 中使用 window 函数。这是我想做的事情的一个例子,只需计算用户有 "event" 的次数(在这种情况下 "dt" 是模拟时间戳)。

from pyspark.sql.window import Window
from pyspark.sql.functions import count

df = sqlContext.createDataFrame([{"id": 123, "dt": 0}, {"id": 123, "dt": 1}, {"id": 234, "dt":0}, {"id": 456, "dt":0}, {"id": 456, "dt":1}, {"id":456, "dt":2}])
df.select(["id","dt"], count("dt").over(Window.partitionBy("id").orderBy("dt")).alias("count")).show()

这会产生错误。 window 函数的正确使用方法是什么?我读到 1.4.1(我们需要使用的版本,因为它是 AWS 上的标准版本)应该能够使用 DataFrame API.

FWIW,关于这个主题的文档非常少。我实际上很难得到任何例子 运行.

它抛出一个异常,因为你传递了一个列列表。 DataFrame.select 的签名如下所示

df.select(self, *cols)

并且使用 window 函数的表达式是一个与其他任何列一样的列,因此您在这里需要的是这样的:

w = Window.partitionBy("id").orderBy("dt") # Just for clarity
df.select("id","dt", count("dt").over(w).alias("count")).show()

## +---+---+-----+
## | id| dt|count|
## +---+---+-----+
## |234|  0|    1|
## |456|  0|    1|
## |456|  1|    2|
## |456|  2|    3|
## |123|  0|    1|
## |123|  1|    2|
## +---+---+-----+

一般来说,Spark SQL window 函数的行为与任何现代 RDBMS 中的行为完全相同。