Pyspark SQL 查询以获取特定列的 +/- 20% 的行
Pyspark SQL query to get rows that are +/- 20% of a specific column
我有以下 pyspark df:
+------------------+--------+-------+
| ID| Assets|Revenue|
+------------------+--------+-------+
|201542399349300619| 1633944| 32850|
|201542399349300629| 3979760| 850914|
|201542399349300634| 3402687|1983568|
|201542399349300724| 1138291|1097553|
|201522369349300122| 1401406|1010828|
|201522369349300137| 16948| 171534|
|201522369349300142|13474056|2285323|
|201522369349300202| 481045| 241788|
|201522369349300207| 700861|1185640|
|201522369349300227| 178479| 267976|
+------------------+--------+-------+
对于每一行,我希望能够获得资产金额的 20% 以内的行。例如,对于第一行 (ID=201542399349300619),我希望能够获取资产在 1,633,944 的 20% +/- 范围内的所有行(因此在 1,307,155 到 1,960,732 之间):
+------------------+--------+-------+
| ID| Assets|Revenue|
+------------------+--------+-------+
|201542399349300619| 1633944| 32850|
|201522369349300122| 1401406|1010828|
使用这个子集 table,我想获取平均资产并将其添加为新列。所以对于上面的例子,平均资产为 (1633944+1401406) = 1517675
+------------------+--------+-------+---------+
| ID| Assets|Revenue|AvgAssets|
+------------------+--------+-------+---------+
|201542399349300619| 1633944| 32850| 1517675|
假设您的 DataFrame 具有类似于以下的架构(即 Assets
和 Revenue
是数字):
df.printSchema()
#root
# |-- ID: long (nullable = true)
# |-- Assets: integer (nullable = true)
# |-- Revenue: integer (nullable = true)
您可以 join 在您提出的条件下,DataFrame 本身。 join后,可以对Assets
列取平均值进行分组聚合。
例如:
from pyspark.sql.functions import avg, expr
df.alias("l")\
.join(
df.alias("r"),
on=expr("r.assets between l.assets*0.8 and l.assets*1.2")
)\
.groupBy("l.ID", "l.Assets", "l.Revenue")\
.agg(avg("r.Assets").alias("AvgAssets"))\
.show()
#+------------------+--------+-------+------------------+
#| ID| Assets|Revenue| AvgAssets|
#+------------------+--------+-------+------------------+
#|201542399349300629| 3979760| 850914| 3691223.5|
#|201522369349300202| 481045| 241788| 481045.0|
#|201522369349300207| 700861|1185640| 700861.0|
#|201522369349300137| 16948| 171534| 16948.0|
#|201522369349300142|13474056|2285323| 1.3474056E7|
#|201522369349300227| 178479| 267976| 178479.0|
#|201542399349300619| 1633944| 32850| 1517675.0|
#|201522369349300122| 1401406|1010828|1391213.6666666667|
#|201542399349300724| 1138291|1097553| 1138291.0|
#|201542399349300634| 3402687|1983568| 3691223.5|
#+------------------+--------+-------+------------------+
由于我们将 DataFrame 连接到自身,我们可以使用别名来引用左侧 table ("l"
) 和右侧 table ("r"
) .上面的逻辑说加入 l
到 r
条件是 r
中的资产是 l
.
中资产的 +/20%
有多种方法可以表达 +/20% 条件,但我使用 spark-sql between
表达式来查找 Assets * 0.8
和 [= 之间的行23=].
然后我们对左侧 table 的所有列 (groupBy
) 进行聚合,并对右侧 table.
的资产进行平均
生成的 AvgAssets
列是 FloatType
列,但您可以通过在 .alias("AvgAssets")
之前添加 .cast("int")
轻松将其转换为 IntegerType
if那就是你喜欢的。
另请参阅:
我有以下 pyspark df:
+------------------+--------+-------+
| ID| Assets|Revenue|
+------------------+--------+-------+
|201542399349300619| 1633944| 32850|
|201542399349300629| 3979760| 850914|
|201542399349300634| 3402687|1983568|
|201542399349300724| 1138291|1097553|
|201522369349300122| 1401406|1010828|
|201522369349300137| 16948| 171534|
|201522369349300142|13474056|2285323|
|201522369349300202| 481045| 241788|
|201522369349300207| 700861|1185640|
|201522369349300227| 178479| 267976|
+------------------+--------+-------+
对于每一行,我希望能够获得资产金额的 20% 以内的行。例如,对于第一行 (ID=201542399349300619),我希望能够获取资产在 1,633,944 的 20% +/- 范围内的所有行(因此在 1,307,155 到 1,960,732 之间):
+------------------+--------+-------+
| ID| Assets|Revenue|
+------------------+--------+-------+
|201542399349300619| 1633944| 32850|
|201522369349300122| 1401406|1010828|
使用这个子集 table,我想获取平均资产并将其添加为新列。所以对于上面的例子,平均资产为 (1633944+1401406) = 1517675
+------------------+--------+-------+---------+
| ID| Assets|Revenue|AvgAssets|
+------------------+--------+-------+---------+
|201542399349300619| 1633944| 32850| 1517675|
假设您的 DataFrame 具有类似于以下的架构(即 Assets
和 Revenue
是数字):
df.printSchema()
#root
# |-- ID: long (nullable = true)
# |-- Assets: integer (nullable = true)
# |-- Revenue: integer (nullable = true)
您可以 join 在您提出的条件下,DataFrame 本身。 join后,可以对Assets
列取平均值进行分组聚合。
例如:
from pyspark.sql.functions import avg, expr
df.alias("l")\
.join(
df.alias("r"),
on=expr("r.assets between l.assets*0.8 and l.assets*1.2")
)\
.groupBy("l.ID", "l.Assets", "l.Revenue")\
.agg(avg("r.Assets").alias("AvgAssets"))\
.show()
#+------------------+--------+-------+------------------+
#| ID| Assets|Revenue| AvgAssets|
#+------------------+--------+-------+------------------+
#|201542399349300629| 3979760| 850914| 3691223.5|
#|201522369349300202| 481045| 241788| 481045.0|
#|201522369349300207| 700861|1185640| 700861.0|
#|201522369349300137| 16948| 171534| 16948.0|
#|201522369349300142|13474056|2285323| 1.3474056E7|
#|201522369349300227| 178479| 267976| 178479.0|
#|201542399349300619| 1633944| 32850| 1517675.0|
#|201522369349300122| 1401406|1010828|1391213.6666666667|
#|201542399349300724| 1138291|1097553| 1138291.0|
#|201542399349300634| 3402687|1983568| 3691223.5|
#+------------------+--------+-------+------------------+
由于我们将 DataFrame 连接到自身,我们可以使用别名来引用左侧 table ("l"
) 和右侧 table ("r"
) .上面的逻辑说加入 l
到 r
条件是 r
中的资产是 l
.
有多种方法可以表达 +/20% 条件,但我使用 spark-sql between
表达式来查找 Assets * 0.8
和 [= 之间的行23=].
然后我们对左侧 table 的所有列 (groupBy
) 进行聚合,并对右侧 table.
生成的 AvgAssets
列是 FloatType
列,但您可以通过在 .alias("AvgAssets")
之前添加 .cast("int")
轻松将其转换为 IntegerType
if那就是你喜欢的。
另请参阅: