Pyspark:使用 window 函数过滤行

Pyspark : Filtering rows using window functions

我有以下数据框和数据

Id        data          time
9999      RANGE= 0  14-02-2022 03:47
9999      RANGE= 0  14-02-2022 03:51
9999      RANGE= 1  14-02-2022 03:59
9999      RANGE= 1  14-02-2022 04:01
9999      RANGE= 0  14-02-2022 13:04
9999      RANGE= 0  14-02-2022 13:06
9999      RANGE= 0  14-02-2022 13:10
9999      RANGE= 1  14-02-2022 13:14
9999      RANGE= 1  14-02-2022 13:16
9999      RANGE= 1  14-02-2022 13:18
9999      RANGE= 0  14-02-2022 13:19
9999      RANGE= 0  14-02-2022 13:20
9999      RANGE= 0  14-02-2022 13:23

首先检查 RANGE=1,然后忽略所有 RANGE=1 的记录,直到找到 RANGE=0 的记录,然后再次查找 RANGE=1 的记录,同时忽略 RANGE=0 的记录,依此类推。

最终数据框看起来像::

Id        data          time
9999      RANGE= 1  14-02-2022 03:59
9999      RANGE= 0  14-02-2022 13:04
9999      RANGE= 1  14-02-2022 13:14
9999      RANGE= 0  14-02-2022 13:19

如何使用 pyspark window 函数实现它。

一种方法是使用 a window function

from pyspark.sql import functions as F
from pyspark.sql.window import Window

my_window = Window.orderBy("time")

df = df.withColumn("prev", F.lag(df.data).over(my_window))
df = df.withColumn("diff", F.when(df.data == df.prev, 0)
                              .otherwise(1))

df = df.filter(df.diff ==1).drop("prev","diff")
from pyspark.sql.window import Window
from pyspark.sql import functions as F
        
data = [('9999','RANGE= 1','14-02-2022 03:40'),
('9999','RANGE= 0','14-02-2022 03:47'),('9999','RANGE= 0','14-02-2022 03:51'),('9999','RANGE= 1','14-02-2022 03:59'),
('9999','RANGE= 1','14-02-2022 04:01'),('9999','RANGE= 0','14-02-2022 13:04'),('9999','RANGE= 0','14-02-2022 13:06'),
('9999','RANGE= 0','14-02-2022 13:10'),('9999','RANGE= 1','14-02-2022 13:14'),('9999','RANGE= 1','14-02-2022 13:16'),
('9999','RANGE= 1','14-02-2022 13:18'),('9999','RANGE= 0','14-02-2022 13:19'),('9999','RANGE= 0','14-02-2022 13:20'),
('9999','RANGE= 0','14-02-2022 13:23')]
    
cols = ["Id","data","time"]
    
df = spark.createDataFrame(data = data, schema = cols)
    
w = Window.partitionBy().orderBy("time")

df = spark.createDataFrame(data = data, schema = cols)
df = df.withColumn("rown", F.row_number().over(w))
df2 = df.filter(df.rown == 1)
df2 = df2.filter(df2.data == 'RANGE= 1').drop("rown","next_time","next_data")
df = df.withColumn("next_time", F.lead(df.time).over(my_window))
df = df.withColumn("next_data", F.lead(df.data).over(w))
df = df.filter(df.next_data != df.data).drop("rown","data","time")

然后我们合并 df2 和 df。