在 PySpark Dataframe 中从一列到另一列的最近日期
Closest Date looking from One Column to another in PySpark Dataframe
我有一个 pyspark 数据框,其中提到了商品的价格,但没有商品购买时间的数据,我只有 1 年的范围。
+---------+------------+----------------+----------------+
|Commodity| BuyingPrice|Date_Upper_limit|Date_lower_limit|
+---------+------------+----------------+----------------+
| Apple| 5| 2020-07-04| 2019-07-03|
| Banana| 3| 2020-07-03| 2019-07-02|
| Banana| 4| 2019-10-02| 2018-10-01|
| Apple| 6| 2020-01-20| 2019-01-19|
| Banana| 3.5| 2019-08-17| 2018-08-16|
+---------+------------+----------------+----------------+
我有另一个 pyspark 数据框,我可以在其中查看所有商品的市场价格和日期。
+----------+----------+------------+
| Date| Commodity|Market Price|
+----------+----------+------------+
|2020-07-01| Apple| 3|
|2020-07-01| Banana| 3|
|2020-07-02| Apple| 4|
|2020-07-02| Banana| 2.5|
|2020-07-03| Apple| 7|
|2020-07-03| Banana| 4|
+----------+----------+------------+
当该商品的市场价格 (MP) < 或 = 购买价格 (BP) 时,我想查看最接近日期上限的日期。
预期输出(前 2 列):
+---------+------------+----------------+----------------+--------------------------------+
|Commodity| BuyingPrice|Date_Upper_limit|Date_lower_limit|Closest Date to UL when MP <= BP|
+---------+------------+----------------+----------------+--------------------------------+
| Apple| 5| 2020-07-04| 2019-07-03| 2020-07-02|
| Banana| 3| 2020-07-03| 2019-07-02| 2020-07-02|
+---------+------------+----------------+----------------+--------------------------------+
尽管 Apple 在 2020-07-01 ($3) 时要低得多,但自 2020-07-02 以来是第一个从 MP <= BP 的日期上限 (UL) 倒退的日期。所以,我选择了2020-07-02。
如何向后查看填充可能购买的日期?
试试 conditional join
和 window function
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().partitionBy("Commodity")
df1\ #first dataframe shown being df1 and second being df2
.join(df2.withColumnRenamed("Commodity","Commodity1")\
, F.expr("""`Market Price`<=BuyingPrice and Date<Date_Upper_limit and Commodity==Commodity1"""))\
.drop("Market Price","Commodity1")\
.withColumn("max", F.max("Date").over(w))\
.filter('max==Date').drop("max").withColumnRenamed("Date","Closest Date to UL when MP <= BP")\
.show()
#+---------+-----------+----------------+----------------+--------------------------------+
#|Commodity|BuyingPrice|Date_Upper_limit|Date_lower_limit|Closest Date to UL when MP <= BP|
#+---------+-----------+----------------+----------------+--------------------------------+
#| Banana| 3.0| 2020-07-03| 2019-07-02| 2020-07-02|
#| Apple| 5.0| 2020-07-04| 2019-07-03| 2020-07-02|
#+---------+-----------+----------------+----------------+--------------------------------+
我有一个 pyspark 数据框,其中提到了商品的价格,但没有商品购买时间的数据,我只有 1 年的范围。
+---------+------------+----------------+----------------+
|Commodity| BuyingPrice|Date_Upper_limit|Date_lower_limit|
+---------+------------+----------------+----------------+
| Apple| 5| 2020-07-04| 2019-07-03|
| Banana| 3| 2020-07-03| 2019-07-02|
| Banana| 4| 2019-10-02| 2018-10-01|
| Apple| 6| 2020-01-20| 2019-01-19|
| Banana| 3.5| 2019-08-17| 2018-08-16|
+---------+------------+----------------+----------------+
我有另一个 pyspark 数据框,我可以在其中查看所有商品的市场价格和日期。
+----------+----------+------------+
| Date| Commodity|Market Price|
+----------+----------+------------+
|2020-07-01| Apple| 3|
|2020-07-01| Banana| 3|
|2020-07-02| Apple| 4|
|2020-07-02| Banana| 2.5|
|2020-07-03| Apple| 7|
|2020-07-03| Banana| 4|
+----------+----------+------------+
当该商品的市场价格 (MP) < 或 = 购买价格 (BP) 时,我想查看最接近日期上限的日期。
预期输出(前 2 列):
+---------+------------+----------------+----------------+--------------------------------+
|Commodity| BuyingPrice|Date_Upper_limit|Date_lower_limit|Closest Date to UL when MP <= BP|
+---------+------------+----------------+----------------+--------------------------------+
| Apple| 5| 2020-07-04| 2019-07-03| 2020-07-02|
| Banana| 3| 2020-07-03| 2019-07-02| 2020-07-02|
+---------+------------+----------------+----------------+--------------------------------+
尽管 Apple 在 2020-07-01 ($3) 时要低得多,但自 2020-07-02 以来是第一个从 MP <= BP 的日期上限 (UL) 倒退的日期。所以,我选择了2020-07-02。
如何向后查看填充可能购买的日期?
试试 conditional join
和 window function
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().partitionBy("Commodity")
df1\ #first dataframe shown being df1 and second being df2
.join(df2.withColumnRenamed("Commodity","Commodity1")\
, F.expr("""`Market Price`<=BuyingPrice and Date<Date_Upper_limit and Commodity==Commodity1"""))\
.drop("Market Price","Commodity1")\
.withColumn("max", F.max("Date").over(w))\
.filter('max==Date').drop("max").withColumnRenamed("Date","Closest Date to UL when MP <= BP")\
.show()
#+---------+-----------+----------------+----------------+--------------------------------+
#|Commodity|BuyingPrice|Date_Upper_limit|Date_lower_limit|Closest Date to UL when MP <= BP|
#+---------+-----------+----------------+----------------+--------------------------------+
#| Banana| 3.0| 2020-07-03| 2019-07-02| 2020-07-02|
#| Apple| 5.0| 2020-07-04| 2019-07-03| 2020-07-02|
#+---------+-----------+----------------+----------------+--------------------------------+