pyspark:在 window 上计算不同
pyspark: count distinct over a window
我刚刚尝试在 window 上执行 countDistinct
并收到此错误:
AnalysisException: u'Distinct window functions are not supported:
count(distinct color#1926)
有没有办法在 pyspark 中对 window 进行不同的计数?
下面是一些示例代码:
from pyspark.sql.window import Window
from pyspark.sql import functions as F
#function to calculate number of seconds from number of days
days = lambda i: i * 86400
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00", "orange"),
(13, "2017-03-15T12:27:18+00:00", "red"),
(25, "2017-03-18T11:27:18+00:00", "red")],
["dollars", "timestampGMT", "color"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))
df = df.withColumn('distinct_color_count_over_the_last_week', F.countDistinct("color").over(w))
df.show()
这是我希望看到的输出:
+-------+--------------------+------+---------------------------------------+
|dollars| timestampGMT| color|distinct_color_count_over_the_last_week|
+-------+--------------------+------+---------------------------------------+
| 17|2017-03-10 15:27:...|orange| 1|
| 13|2017-03-15 12:27:...| red| 2|
| 25|2017-03-18 11:27:...| red| 1|
+-------+--------------------+------+---------------------------------------+
编辑:
正如 noleto 在下面的回答中提到的那样,自 pyspark 2.1 以来,现在有一个 approx_count_distinct 函数可以在 window.
上运行
原答案
我发现我可以结合使用 collect_set 和 size 函数来模仿 countDistinct 在 window:
上的功能
from pyspark.sql.window import Window
from pyspark.sql import functions as F
#function to calculate number of seconds from number of days
days = lambda i: i * 86400
#create some test data
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00", "orange"),
(13, "2017-03-15T12:27:18+00:00", "red"),
(25, "2017-03-18T11:27:18+00:00", "red")],
["dollars", "timestampGMT", "color"])
#convert string timestamp to timestamp type
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))
#use collect_set and size functions to perform countDistinct over a window
df = df.withColumn('distinct_color_count_over_the_last_week', F.size(F.collect_set("color").over(w)))
df.show()
这导致与前一周的记录不同的颜色计数:
+-------+--------------------+------+---------------------------------------+
|dollars| timestampGMT| color|distinct_color_count_over_the_last_week|
+-------+--------------------+------+---------------------------------------+
| 17|2017-03-10 15:27:...|orange| 1|
| 13|2017-03-15 12:27:...| red| 2|
| 25|2017-03-18 11:27:...| red| 1|
+-------+--------------------+------+---------------------------------------+
@Bob Swain 的回答很好而且有效!从那时起,Spark version 2.1,Spark 提供了一个等同于 countDistinct
的函数,approx_count_distinct
使用起来效率更高,最重要的是,它支持在 window.[=15 上进行不同的计数=]
替换代码如下:
#approx_count_distinct supports a window
df = df.withColumn('distinct_color_count_over_the_last_week', F.approx_count_distinct("color").over(w))
对于基数较小的列,结果应该与 "countDistinct" 相同。当数据集增长很多时,您应该考虑调整参数 rsd
– 允许的最大估计误差,它允许您调整权衡 precision/performance.
我刚刚尝试在 window 上执行 countDistinct
并收到此错误:
AnalysisException: u'Distinct window functions are not supported: count(distinct color#1926)
有没有办法在 pyspark 中对 window 进行不同的计数?
下面是一些示例代码:
from pyspark.sql.window import Window
from pyspark.sql import functions as F
#function to calculate number of seconds from number of days
days = lambda i: i * 86400
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00", "orange"),
(13, "2017-03-15T12:27:18+00:00", "red"),
(25, "2017-03-18T11:27:18+00:00", "red")],
["dollars", "timestampGMT", "color"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))
df = df.withColumn('distinct_color_count_over_the_last_week', F.countDistinct("color").over(w))
df.show()
这是我希望看到的输出:
+-------+--------------------+------+---------------------------------------+
|dollars| timestampGMT| color|distinct_color_count_over_the_last_week|
+-------+--------------------+------+---------------------------------------+
| 17|2017-03-10 15:27:...|orange| 1|
| 13|2017-03-15 12:27:...| red| 2|
| 25|2017-03-18 11:27:...| red| 1|
+-------+--------------------+------+---------------------------------------+
编辑:
正如 noleto 在下面的回答中提到的那样,自 pyspark 2.1 以来,现在有一个 approx_count_distinct 函数可以在 window.
上运行原答案
我发现我可以结合使用 collect_set 和 size 函数来模仿 countDistinct 在 window:
上的功能from pyspark.sql.window import Window
from pyspark.sql import functions as F
#function to calculate number of seconds from number of days
days = lambda i: i * 86400
#create some test data
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00", "orange"),
(13, "2017-03-15T12:27:18+00:00", "red"),
(25, "2017-03-18T11:27:18+00:00", "red")],
["dollars", "timestampGMT", "color"])
#convert string timestamp to timestamp type
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))
#use collect_set and size functions to perform countDistinct over a window
df = df.withColumn('distinct_color_count_over_the_last_week', F.size(F.collect_set("color").over(w)))
df.show()
这导致与前一周的记录不同的颜色计数:
+-------+--------------------+------+---------------------------------------+
|dollars| timestampGMT| color|distinct_color_count_over_the_last_week|
+-------+--------------------+------+---------------------------------------+
| 17|2017-03-10 15:27:...|orange| 1|
| 13|2017-03-15 12:27:...| red| 2|
| 25|2017-03-18 11:27:...| red| 1|
+-------+--------------------+------+---------------------------------------+
@Bob Swain 的回答很好而且有效!从那时起,Spark version 2.1,Spark 提供了一个等同于 countDistinct
的函数,approx_count_distinct
使用起来效率更高,最重要的是,它支持在 window.[=15 上进行不同的计数=]
替换代码如下:
#approx_count_distinct supports a window
df = df.withColumn('distinct_color_count_over_the_last_week', F.approx_count_distinct("color").over(w))
对于基数较小的列,结果应该与 "countDistinct" 相同。当数据集增长很多时,您应该考虑调整参数 rsd
– 允许的最大估计误差,它允许您调整权衡 precision/performance.