pyspark:具有来自另一行的列值的重复行
pyspark: duplicate row with column value from another row
我的输入 df:
+-------------------+------------+
| windowStart| nodeId|
+-------------------+------------+
|2022-03-11 14:00:00|1 |
|2022-03-11 15:00:00|2 |
|2022-03-11 16:00:00|3 |
我想复制每一行并使用后续行的 windowStart
值,因此输出应如下所示:
+-------------------+------------+
| windowStart| nodeId|
+-------------------+------------+
|2022-03-11 14:00:00|1 |
|2022-03-11 15:00:00|1 |
|2022-03-11 15:00:00|2 |
|2022-03-11 16:00:00|2 |
|2022-03-11 16:00:00|3 |
如何实现?谢谢!
df = spark.createDataFrame(
[
('2022-03-11 14:00:00','1'),
('2022-03-11 15:00:00','2'),
('2022-03-11 16:00:00','3')
], ['windowStart','nodeId'])
from pyspark.sql import Window as W
from pyspark.sql import functions as F
w = W.orderBy('windowStart')
df_lag = df\
.withColumn('lag', F.lead(F.col("windowStart"), 1).over(w))\
.select(F.col('lag').alias('windowStart'), 'nodeId')\
.filter(F.col('windowStart').isNotNull())
df.union(df_lag)\
.orderBy('windowStart', 'nodeId')\
.show()
+-------------------+------+
| windowStart|nodeId|
+-------------------+------+
|2022-03-11 14:00:00| 1|
|2022-03-11 15:00:00| 1|
|2022-03-11 15:00:00| 2|
|2022-03-11 16:00:00| 2|
|2022-03-11 16:00:00| 3|
+-------------------+------+
我的输入 df:
+-------------------+------------+
| windowStart| nodeId|
+-------------------+------------+
|2022-03-11 14:00:00|1 |
|2022-03-11 15:00:00|2 |
|2022-03-11 16:00:00|3 |
我想复制每一行并使用后续行的 windowStart
值,因此输出应如下所示:
+-------------------+------------+
| windowStart| nodeId|
+-------------------+------------+
|2022-03-11 14:00:00|1 |
|2022-03-11 15:00:00|1 |
|2022-03-11 15:00:00|2 |
|2022-03-11 16:00:00|2 |
|2022-03-11 16:00:00|3 |
如何实现?谢谢!
df = spark.createDataFrame(
[
('2022-03-11 14:00:00','1'),
('2022-03-11 15:00:00','2'),
('2022-03-11 16:00:00','3')
], ['windowStart','nodeId'])
from pyspark.sql import Window as W
from pyspark.sql import functions as F
w = W.orderBy('windowStart')
df_lag = df\
.withColumn('lag', F.lead(F.col("windowStart"), 1).over(w))\
.select(F.col('lag').alias('windowStart'), 'nodeId')\
.filter(F.col('windowStart').isNotNull())
df.union(df_lag)\
.orderBy('windowStart', 'nodeId')\
.show()
+-------------------+------+
| windowStart|nodeId|
+-------------------+------+
|2022-03-11 14:00:00| 1|
|2022-03-11 15:00:00| 1|
|2022-03-11 15:00:00| 2|
|2022-03-11 16:00:00| 2|
|2022-03-11 16:00:00| 3|
+-------------------+------+