Pyspark:用户为每个唯一 ID 定义的弹性
Pyspark : User defined elasticity for each unique id
我正在研究价格弹性问题,我需要计算每个唯一 ID 的弹性
我的数据框看起来像
| id | price | items |
| 101 | 5 | 10 |
| 101 | 10 | 15 |
| 101 | 12 | 20 |
| 102 | 1 | 1 |
| 102 | 3 | 7 |
求弹性:
考虑101的例子,价格发生了3次变化,这3次价格变化和对应的商品变化应该是新的dataframe。
1) 价格变化 5(5 -> 10(或 10 -> 5))导致 5 项变化(10 -> 15(或 15 -> 10)),因此相应的行将是 pricechange =5 , itemschange=5
2) 价格变化 7(5 -> 12(或 12 -> 5))导致 10 件商品变化(10 -> 20(或 20 -> 10)),因此相应的行将是 pricechange =7 , itemschange=10
3) 2 的价格变化(10 -> 12(或 12 -> 10))导致 5 项变化(15 -> 20(或 20 -> 15))因此相应的行将是 pricechange =2 , itemschange=5
数据框将转换为:
| id | pricechange | itemschange |
| 101 | 5 | 5 |
| 101 | 7 | 10 |
| 101 | 2 | 5 |
| 102 | 2 | 6 |
这是您可以遵循的详细方法 -
定义模式并为 DF 准备数据
df = spark.createDataFrame([
(101,5,10),
(101,10,15),
(101,12,20),
(102,1,1),
(102,3,7)
],'id : int, price : int, item: int')
创建虚拟列排名以将每个 ID 与具有相同 ID 的所有其他记录进行比较
from pyspark.sql.window import Window
from pyspark.sql.functions import *
windowSpec = Window.partitionBy('id').orderBy('id')
rank = row_number().over(windowSpec).alias('rank')
df = df.withColumn("rank", rank)
最终逻辑 - 加入和过滤
df.alias('a').\
join(df.alias('b'),on='id').\
where('a.rank < b.rank').\
selectExpr("a.id as id","b.price - a.price as price","b.item - a.item as item").\
show()
恕我直言 - post 到目前为止您尝试过的内容以及 error/issue 您面临的问题总是更好。这有助于获得快速和更好的响应。
您可以简单地在 id
列上将 DataFrame 与其自身进行内部连接。为避免重复记录,定义一个 where
子句,要求左侧 DataFrame 的价格大于右侧 DataFrame 的价格。
加入后,select 所需的列:
from pyspark.sql.functions import col
df.alias("r").join(df.alias("l"), on="id")\
.where("l.price > r.price")\
.select(
"id",
(col("l.price") - col("r.price")).alias("pricechange"),
(col("l.item") - col("r.item")).alias("itemschange"),
).show()
#+---+-----------+-----------+
#| id|pricechange|itemschange|
#+---+-----------+-----------+
#|101| 2| 5|
#|101| 7| 10|
#|101| 5| 5|
#|102| 2| 6|
#+---+-----------+-----------+
这比使用 Window
更有效。
我正在研究价格弹性问题,我需要计算每个唯一 ID 的弹性
我的数据框看起来像
| id | price | items |
| 101 | 5 | 10 |
| 101 | 10 | 15 |
| 101 | 12 | 20 |
| 102 | 1 | 1 |
| 102 | 3 | 7 |
求弹性: 考虑101的例子,价格发生了3次变化,这3次价格变化和对应的商品变化应该是新的dataframe。
1) 价格变化 5(5 -> 10(或 10 -> 5))导致 5 项变化(10 -> 15(或 15 -> 10)),因此相应的行将是 pricechange =5 , itemschange=5
2) 价格变化 7(5 -> 12(或 12 -> 5))导致 10 件商品变化(10 -> 20(或 20 -> 10)),因此相应的行将是 pricechange =7 , itemschange=10
3) 2 的价格变化(10 -> 12(或 12 -> 10))导致 5 项变化(15 -> 20(或 20 -> 15))因此相应的行将是 pricechange =2 , itemschange=5
数据框将转换为:
| id | pricechange | itemschange |
| 101 | 5 | 5 |
| 101 | 7 | 10 |
| 101 | 2 | 5 |
| 102 | 2 | 6 |
这是您可以遵循的详细方法 -
定义模式并为 DF 准备数据
df = spark.createDataFrame([
(101,5,10),
(101,10,15),
(101,12,20),
(102,1,1),
(102,3,7)
],'id : int, price : int, item: int')
创建虚拟列排名以将每个 ID 与具有相同 ID 的所有其他记录进行比较
from pyspark.sql.window import Window
from pyspark.sql.functions import *
windowSpec = Window.partitionBy('id').orderBy('id')
rank = row_number().over(windowSpec).alias('rank')
df = df.withColumn("rank", rank)
最终逻辑 - 加入和过滤
df.alias('a').\
join(df.alias('b'),on='id').\
where('a.rank < b.rank').\
selectExpr("a.id as id","b.price - a.price as price","b.item - a.item as item").\
show()
恕我直言 - post 到目前为止您尝试过的内容以及 error/issue 您面临的问题总是更好。这有助于获得快速和更好的响应。
您可以简单地在 id
列上将 DataFrame 与其自身进行内部连接。为避免重复记录,定义一个 where
子句,要求左侧 DataFrame 的价格大于右侧 DataFrame 的价格。
加入后,select 所需的列:
from pyspark.sql.functions import col
df.alias("r").join(df.alias("l"), on="id")\
.where("l.price > r.price")\
.select(
"id",
(col("l.price") - col("r.price")).alias("pricechange"),
(col("l.item") - col("r.item")).alias("itemschange"),
).show()
#+---+-----------+-----------+
#| id|pricechange|itemschange|
#+---+-----------+-----------+
#|101| 2| 5|
#|101| 7| 10|
#|101| 5| 5|
#|102| 2| 6|
#+---+-----------+-----------+
这比使用 Window
更有效。