根据前一行对 pyspark 列中的一行进行操作

operating on a row in a pyspark column based on previous row

我有一个 pyspark 数据框,我想根据另一列 _MS 的值更新 drift_MS 列。但是,我将应用的数学会根据 _MS

的条件而有所不同

数据框:

|SEQ_ID |TIME_STAMP             |_MS               |
+-------+-----------------------+------------------+
|3879826|2021-07-29 11:24:20.525|NaN               |
|3879826|2021-07-29 11:25:56.934|21.262409581399556|
|3879826|2021-07-29 11:27:43.264|27.247600203353613|
|3879826|2021-07-29 11:29:27.613|18.13528511851038 |
|3879826|2021-07-29 11:31:10.512|2.520896614376871 |
|3879826|2021-07-29 11:32:54.252|2.7081931585605541|
|3879826|2021-07-29 11:34:36.995|2.9832290627235505|
|3879826|2021-07-29 11:36:19.128|13.011968111650264|
|3879826|2021-07-29 11:38:10.919|17.762006254598797|
|3879826|2021-07-29 11:40:01.929|1.9661930950977457|

_MS >=3 且前一个 _MS 小于当前 _MS 我想将 drift_MS 增加 100。但是如果 _MS <3 并且之前的 _MS < 当前 _MS 我想将 drift_MS 增加 1,否则保持之前的 drift_MS

预期输出:

|SEQ_ID |TIME_STAMP             |_MS               |drift_MS|
+-------+-----------------------+------------------+--------+
|3879826|2021-07-29 11:24:20.525|NaN               |0       |
|3879826|2021-07-29 11:25:56.934|21.262409581399556|0       |
|3879826|2021-07-29 11:27:43.264|27.247600203353613|100     |
|3879826|2021-07-29 11:29:27.613|18.13528511851038 |100     |
|3879826|2021-07-29 11:31:10.512|2.520896614376871 |100     |
|3879826|2021-07-29 11:32:54.252|2.7081931585605541|101     |
|3879826|2021-07-29 11:34:36.995|2.9832290627235505|102     |
|3879826|2021-07-29 11:36:19.128|13.011968111650264|202     |
|3879826|2021-07-29 11:38:10.919|17.762006254598797|302     |
|3879826|2021-07-29 11:40:01.929|1.9661930950977457|302     |

我尝试了以下代码:

import pyspark.sql.functions as f

w1=Window.partitionBy('SEQ_ID').orderBy(col('TIME_STAMP').asc())
    
prev_MS = (f.lag(col('_MS'),1).over(w1))
prev_drift_MS = (f.lag(col('drift_MS'),1).over(w1))
    
df2=df.withColumn('drift_MS', when((col('_MS') < 3) & (prev_MS < col('_MS')), prev_drift_MS+1)\
.when((col('_MS') >= 3) & (prev_MS < col('_MS')), prev_drift_MS+100).otherwise(prev_drift_MS+0))

但是 drift_MS 列不是 100 就是 1。 我做错了什么?

尝试:

df.withColumn('drift_MS', 
  f.sum(
    when((col('_MS') < 3) & (prev_MS < col('_MS')), 1)
    .when((col('_MS') >= 3) & (prev_MS < col('_MS')), 100)
    .otherwise(0)
 ).over(w1))