根据前一行对 pyspark 列中的一行进行操作
operating on a row in a pyspark column based on previous row
我有一个 pyspark 数据框,我想根据另一列 _MS
的值更新 drift_MS
列。但是,我将应用的数学会根据 _MS
的条件而有所不同
数据框:
|SEQ_ID |TIME_STAMP |_MS |
+-------+-----------------------+------------------+
|3879826|2021-07-29 11:24:20.525|NaN |
|3879826|2021-07-29 11:25:56.934|21.262409581399556|
|3879826|2021-07-29 11:27:43.264|27.247600203353613|
|3879826|2021-07-29 11:29:27.613|18.13528511851038 |
|3879826|2021-07-29 11:31:10.512|2.520896614376871 |
|3879826|2021-07-29 11:32:54.252|2.7081931585605541|
|3879826|2021-07-29 11:34:36.995|2.9832290627235505|
|3879826|2021-07-29 11:36:19.128|13.011968111650264|
|3879826|2021-07-29 11:38:10.919|17.762006254598797|
|3879826|2021-07-29 11:40:01.929|1.9661930950977457|
当 _MS
>=3 且前一个 _MS
小于当前 _MS
我想将 drift_MS
增加 100。但是如果 _MS
<3 并且之前的 _MS
< 当前 _MS
我想将 drift_MS
增加 1,否则保持之前的 drift_MS
值
预期输出:
|SEQ_ID |TIME_STAMP |_MS |drift_MS|
+-------+-----------------------+------------------+--------+
|3879826|2021-07-29 11:24:20.525|NaN |0 |
|3879826|2021-07-29 11:25:56.934|21.262409581399556|0 |
|3879826|2021-07-29 11:27:43.264|27.247600203353613|100 |
|3879826|2021-07-29 11:29:27.613|18.13528511851038 |100 |
|3879826|2021-07-29 11:31:10.512|2.520896614376871 |100 |
|3879826|2021-07-29 11:32:54.252|2.7081931585605541|101 |
|3879826|2021-07-29 11:34:36.995|2.9832290627235505|102 |
|3879826|2021-07-29 11:36:19.128|13.011968111650264|202 |
|3879826|2021-07-29 11:38:10.919|17.762006254598797|302 |
|3879826|2021-07-29 11:40:01.929|1.9661930950977457|302 |
我尝试了以下代码:
import pyspark.sql.functions as f
w1=Window.partitionBy('SEQ_ID').orderBy(col('TIME_STAMP').asc())
prev_MS = (f.lag(col('_MS'),1).over(w1))
prev_drift_MS = (f.lag(col('drift_MS'),1).over(w1))
df2=df.withColumn('drift_MS', when((col('_MS') < 3) & (prev_MS < col('_MS')), prev_drift_MS+1)\
.when((col('_MS') >= 3) & (prev_MS < col('_MS')), prev_drift_MS+100).otherwise(prev_drift_MS+0))
但是 drift_MS
列不是 100 就是 1。
我做错了什么?
尝试:
df.withColumn('drift_MS',
f.sum(
when((col('_MS') < 3) & (prev_MS < col('_MS')), 1)
.when((col('_MS') >= 3) & (prev_MS < col('_MS')), 100)
.otherwise(0)
).over(w1))
我有一个 pyspark 数据框,我想根据另一列 _MS
的值更新 drift_MS
列。但是,我将应用的数学会根据 _MS
数据框:
|SEQ_ID |TIME_STAMP |_MS |
+-------+-----------------------+------------------+
|3879826|2021-07-29 11:24:20.525|NaN |
|3879826|2021-07-29 11:25:56.934|21.262409581399556|
|3879826|2021-07-29 11:27:43.264|27.247600203353613|
|3879826|2021-07-29 11:29:27.613|18.13528511851038 |
|3879826|2021-07-29 11:31:10.512|2.520896614376871 |
|3879826|2021-07-29 11:32:54.252|2.7081931585605541|
|3879826|2021-07-29 11:34:36.995|2.9832290627235505|
|3879826|2021-07-29 11:36:19.128|13.011968111650264|
|3879826|2021-07-29 11:38:10.919|17.762006254598797|
|3879826|2021-07-29 11:40:01.929|1.9661930950977457|
当 _MS
>=3 且前一个 _MS
小于当前 _MS
我想将 drift_MS
增加 100。但是如果 _MS
<3 并且之前的 _MS
< 当前 _MS
我想将 drift_MS
增加 1,否则保持之前的 drift_MS
值
预期输出:
|SEQ_ID |TIME_STAMP |_MS |drift_MS|
+-------+-----------------------+------------------+--------+
|3879826|2021-07-29 11:24:20.525|NaN |0 |
|3879826|2021-07-29 11:25:56.934|21.262409581399556|0 |
|3879826|2021-07-29 11:27:43.264|27.247600203353613|100 |
|3879826|2021-07-29 11:29:27.613|18.13528511851038 |100 |
|3879826|2021-07-29 11:31:10.512|2.520896614376871 |100 |
|3879826|2021-07-29 11:32:54.252|2.7081931585605541|101 |
|3879826|2021-07-29 11:34:36.995|2.9832290627235505|102 |
|3879826|2021-07-29 11:36:19.128|13.011968111650264|202 |
|3879826|2021-07-29 11:38:10.919|17.762006254598797|302 |
|3879826|2021-07-29 11:40:01.929|1.9661930950977457|302 |
我尝试了以下代码:
import pyspark.sql.functions as f
w1=Window.partitionBy('SEQ_ID').orderBy(col('TIME_STAMP').asc())
prev_MS = (f.lag(col('_MS'),1).over(w1))
prev_drift_MS = (f.lag(col('drift_MS'),1).over(w1))
df2=df.withColumn('drift_MS', when((col('_MS') < 3) & (prev_MS < col('_MS')), prev_drift_MS+1)\
.when((col('_MS') >= 3) & (prev_MS < col('_MS')), prev_drift_MS+100).otherwise(prev_drift_MS+0))
但是 drift_MS
列不是 100 就是 1。
我做错了什么?
尝试:
df.withColumn('drift_MS',
f.sum(
when((col('_MS') < 3) & (prev_MS < col('_MS')), 1)
.when((col('_MS') >= 3) & (prev_MS < col('_MS')), 100)
.otherwise(0)
).over(w1))