Spark SQL Row_number() PartitionBy Sort Desc
Spark SQL Row_number() PartitionBy Sort Desc
我已经在 Spark 中使用 Window 成功创建了 row_number()
partitionBy
,但我想按降序而不是默认的升序对其进行排序。这是我的工作代码:
from pyspark import HiveContext
from pyspark.sql.types import *
from pyspark.sql import Row, functions as F
from pyspark.sql.window import Window
data_cooccur.select("driver", "also_item", "unit_count",
F.rowNumber().over(Window.partitionBy("driver").orderBy("unit_count")).alias("rowNum")).show()
这给了我这个结果:
+------+---------+----------+------+
|driver|also_item|unit_count|rowNum|
+------+---------+----------+------+
| s10| s11| 1| 1|
| s10| s13| 1| 2|
| s10| s17| 1| 3|
在这里我添加 desc() 以降序排列:
data_cooccur.select("driver", "also_item", "unit_count", F.rowNumber().over(Window.partitionBy("driver").orderBy("unit_count").desc()).alias("rowNum")).show()
并得到这个错误:
AttributeError: 'WindowSpec' object has no attribute 'desc'
我做错了什么?
desc
应该应用于非 window 定义的列。您可以在列上使用以下任一方法:
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window
F.row_number().over(
Window.partitionBy("driver").orderBy(col("unit_count").desc())
)
或独立函数:
from pyspark.sql.functions import desc
from pyspark.sql.window import Window
F.row_number().over(
Window.partitionBy("driver").orderBy(desc("unit_count"))
)
或者您可以使用 Spark-SQL 中的 SQL 代码:
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.master('local[*]')\
.appName('Test')\
.getOrCreate()
spark.sql("""
select driver
,also_item
,unit_count
,ROW_NUMBER() OVER (PARTITION BY driver ORDER BY unit_count DESC) AS rowNum
from data_cooccur
""").show()
更新 实际上,我尝试对此进行更多研究,但它似乎不起作用。 (实际上它会引发错误)。它不起作用的原因是我在 Databricks 中对 display()
的调用下有这段代码(display()
调用之后的代码永远不会是 运行)。似乎数据帧上的 orderBy()
和 window
上的 orderBy()
实际上并不相同。我会保留这个答案只是为了否定确认
从 PySpark 2.4 开始(可能更早),只需将关键字 ascending=False
添加到 orderBy
调用中即可。
例如。
personal_recos.withColumn("row_number", F.row_number().over(Window.partitionBy("COLLECTOR_NUMBER").orderBy("count", ascending=False)))
和
personal_recos.withColumn("row_number", F.row_number().over(Window.partitionBy("COLLECTOR_NUMBER").orderBy(F.col("count").desc())))
似乎给我同样的行为。
我已经在 Spark 中使用 Window 成功创建了 row_number()
partitionBy
,但我想按降序而不是默认的升序对其进行排序。这是我的工作代码:
from pyspark import HiveContext
from pyspark.sql.types import *
from pyspark.sql import Row, functions as F
from pyspark.sql.window import Window
data_cooccur.select("driver", "also_item", "unit_count",
F.rowNumber().over(Window.partitionBy("driver").orderBy("unit_count")).alias("rowNum")).show()
这给了我这个结果:
+------+---------+----------+------+
|driver|also_item|unit_count|rowNum|
+------+---------+----------+------+
| s10| s11| 1| 1|
| s10| s13| 1| 2|
| s10| s17| 1| 3|
在这里我添加 desc() 以降序排列:
data_cooccur.select("driver", "also_item", "unit_count", F.rowNumber().over(Window.partitionBy("driver").orderBy("unit_count").desc()).alias("rowNum")).show()
并得到这个错误:
AttributeError: 'WindowSpec' object has no attribute 'desc'
我做错了什么?
desc
应该应用于非 window 定义的列。您可以在列上使用以下任一方法:
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window
F.row_number().over(
Window.partitionBy("driver").orderBy(col("unit_count").desc())
)
或独立函数:
from pyspark.sql.functions import desc
from pyspark.sql.window import Window
F.row_number().over(
Window.partitionBy("driver").orderBy(desc("unit_count"))
)
或者您可以使用 Spark-SQL 中的 SQL 代码:
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.master('local[*]')\
.appName('Test')\
.getOrCreate()
spark.sql("""
select driver
,also_item
,unit_count
,ROW_NUMBER() OVER (PARTITION BY driver ORDER BY unit_count DESC) AS rowNum
from data_cooccur
""").show()
更新 实际上,我尝试对此进行更多研究,但它似乎不起作用。 (实际上它会引发错误)。它不起作用的原因是我在 Databricks 中对 display()
的调用下有这段代码(display()
调用之后的代码永远不会是 运行)。似乎数据帧上的 orderBy()
和 window
上的 orderBy()
实际上并不相同。我会保留这个答案只是为了否定确认
从 PySpark 2.4 开始(可能更早),只需将关键字 ascending=False
添加到 orderBy
调用中即可。
例如。
personal_recos.withColumn("row_number", F.row_number().over(Window.partitionBy("COLLECTOR_NUMBER").orderBy("count", ascending=False)))
和
personal_recos.withColumn("row_number", F.row_number().over(Window.partitionBy("COLLECTOR_NUMBER").orderBy(F.col("count").desc())))
似乎给我同样的行为。