根据以前的值对列中的 Spark 值进行排名
Rank values in Spark on a column based on previous values
我有这样一个数据框:
df = spark.createDataFrame(
[
(dt.datetime(2021, 5, 1, 10, 30, 0), 2.15, "a"),
(dt.datetime(2021, 5, 1, 10, 30, 10), 2.12, "a"),
(dt.datetime(2021, 5, 1, 10, 30, 20), 2.13, "a"),
(dt.datetime(2021, 5, 1, 10, 30, 50), 2.14, "a"),
(dt.datetime(2021, 5, 1, 10, 31, 5), 2.13, "a"),
(dt.datetime(2021, 5, 1, 10, 31, 10), 2.16, "a"),
(dt.datetime(2021, 5, 1, 10, 31, 10), 2.16, "b"),
],
["ts", "value", "group"]
)
我想获取值列的排名,使用所有以前的值(按时间戳 ts 排序)。例如:
+-------------------+-----+-----+----+
| ts|value|group|rank|
+-------------------+-----+-----+----+
|2021-05-01 10:30:00| 2.15| a| 1|
|2021-05-01 10:30:10| 2.12| a| 1|
|2021-05-01 10:30:20| 2.13| a| 2|
|2021-05-01 10:30:50| 2.14| a| 3|
|2021-05-01 10:31:05| 2.13| a| 2|
|2021-05-01 10:31:10| 2.16| a| 5|
|2021-05-01 10:31:10| 2.16| b| 1|
+-------------------+-----+-----+----+
我尝试了以下代码:
w = (
Window
.partitionBy("group")
.orderBy("ts")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
df.select(
"*",
f.rank().over(w).alias("rank")
).show()
但基本上只根据时间戳对列进行排名。
知道怎么做吗?
将您的 orderBy()
列更改为 value
import datetime as dt
df = spark.createDataFrame(
[
(dt.datetime(2021, 5, 1, 10, 30, 0), 2.15, "a"),
(dt.datetime(2021, 5, 1, 10, 30, 10), 2.12, "a"),
(dt.datetime(2021, 5, 1, 10, 30, 20), 2.13, "a"),
(dt.datetime(2021, 5, 1, 10, 30, 50), 2.14, "a"),
(dt.datetime(2021, 5, 1, 10, 31, 5), 2.13, "a"),
(dt.datetime(2021, 5, 1, 10, 31, 10), 2.16, "b"),
(dt.datetime(2021, 5, 1, 10, 31, 11), 2.17, "b"),
],
["ts", "value", "group"]
)
w = (
W
.partitionBy("group")
.orderBy("value")
)
df.select(
"*",
F.rank().over(w).alias("rank")
).show()
+-------------------+-----+-----+----+
| ts|value|group|rank|
+-------------------+-----+-----+----+
|2021-05-01 10:30:10| 2.12| a| 1|
|2021-05-01 10:30:20| 2.13| a| 2|
|2021-05-01 10:31:05| 2.13| a| 2|
|2021-05-01 10:30:50| 2.14| a| 4|
|2021-05-01 10:30:00| 2.15| a| 5|
|2021-05-01 10:31:10| 2.16| b| 1|
|2021-05-01 10:31:11| 2.17| b| 2|
+-------------------+-----+-----+----+
rank
函数按 orderBy
子句对数据进行排名,因此您不能按其他列对数据进行排名。您可以使用它作为替代方案
df = (df
.withColumn("rank", F.array_sort(F.collect_set('value').over(w)))
.withColumn('rank', F.expr("array_position(rank, value)")))
df.show()
+-------------------+-----+-----+----+
| ts|value|group|rank|
+-------------------+-----+-----+----+
|2021-05-01 10:31:10| 2.16| b| 1|
|2021-05-01 10:30:00| 2.15| a| 1|
|2021-05-01 10:30:10| 2.12| a| 1|
|2021-05-01 10:30:20| 2.13| a| 2|
|2021-05-01 10:30:50| 2.14| a| 3|
|2021-05-01 10:31:05| 2.13| a| 2|
|2021-05-01 10:31:10| 2.16| a| 5|
+-------------------+-----+-----+----+
如果你想获得dense_rank
,使用collect_list
我有这样一个数据框:
df = spark.createDataFrame(
[
(dt.datetime(2021, 5, 1, 10, 30, 0), 2.15, "a"),
(dt.datetime(2021, 5, 1, 10, 30, 10), 2.12, "a"),
(dt.datetime(2021, 5, 1, 10, 30, 20), 2.13, "a"),
(dt.datetime(2021, 5, 1, 10, 30, 50), 2.14, "a"),
(dt.datetime(2021, 5, 1, 10, 31, 5), 2.13, "a"),
(dt.datetime(2021, 5, 1, 10, 31, 10), 2.16, "a"),
(dt.datetime(2021, 5, 1, 10, 31, 10), 2.16, "b"),
],
["ts", "value", "group"]
)
我想获取值列的排名,使用所有以前的值(按时间戳 ts 排序)。例如:
+-------------------+-----+-----+----+
| ts|value|group|rank|
+-------------------+-----+-----+----+
|2021-05-01 10:30:00| 2.15| a| 1|
|2021-05-01 10:30:10| 2.12| a| 1|
|2021-05-01 10:30:20| 2.13| a| 2|
|2021-05-01 10:30:50| 2.14| a| 3|
|2021-05-01 10:31:05| 2.13| a| 2|
|2021-05-01 10:31:10| 2.16| a| 5|
|2021-05-01 10:31:10| 2.16| b| 1|
+-------------------+-----+-----+----+
我尝试了以下代码:
w = (
Window
.partitionBy("group")
.orderBy("ts")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
df.select(
"*",
f.rank().over(w).alias("rank")
).show()
但基本上只根据时间戳对列进行排名。
知道怎么做吗?
将您的 orderBy()
列更改为 value
import datetime as dt
df = spark.createDataFrame(
[
(dt.datetime(2021, 5, 1, 10, 30, 0), 2.15, "a"),
(dt.datetime(2021, 5, 1, 10, 30, 10), 2.12, "a"),
(dt.datetime(2021, 5, 1, 10, 30, 20), 2.13, "a"),
(dt.datetime(2021, 5, 1, 10, 30, 50), 2.14, "a"),
(dt.datetime(2021, 5, 1, 10, 31, 5), 2.13, "a"),
(dt.datetime(2021, 5, 1, 10, 31, 10), 2.16, "b"),
(dt.datetime(2021, 5, 1, 10, 31, 11), 2.17, "b"),
],
["ts", "value", "group"]
)
w = (
W
.partitionBy("group")
.orderBy("value")
)
df.select(
"*",
F.rank().over(w).alias("rank")
).show()
+-------------------+-----+-----+----+
| ts|value|group|rank|
+-------------------+-----+-----+----+
|2021-05-01 10:30:10| 2.12| a| 1|
|2021-05-01 10:30:20| 2.13| a| 2|
|2021-05-01 10:31:05| 2.13| a| 2|
|2021-05-01 10:30:50| 2.14| a| 4|
|2021-05-01 10:30:00| 2.15| a| 5|
|2021-05-01 10:31:10| 2.16| b| 1|
|2021-05-01 10:31:11| 2.17| b| 2|
+-------------------+-----+-----+----+
rank
函数按 orderBy
子句对数据进行排名,因此您不能按其他列对数据进行排名。您可以使用它作为替代方案
df = (df
.withColumn("rank", F.array_sort(F.collect_set('value').over(w)))
.withColumn('rank', F.expr("array_position(rank, value)")))
df.show()
+-------------------+-----+-----+----+
| ts|value|group|rank|
+-------------------+-----+-----+----+
|2021-05-01 10:31:10| 2.16| b| 1|
|2021-05-01 10:30:00| 2.15| a| 1|
|2021-05-01 10:30:10| 2.12| a| 1|
|2021-05-01 10:30:20| 2.13| a| 2|
|2021-05-01 10:30:50| 2.14| a| 3|
|2021-05-01 10:31:05| 2.13| a| 2|
|2021-05-01 10:31:10| 2.16| a| 5|
+-------------------+-----+-----+----+
如果你想获得dense_rank
,使用collect_list