过滤 lead/lag 为特定值的行(window 带过滤器)
Filter rows where the lead/lag are specific values (window with filter)
我有这样一个数据框:
id x y
1 a 1 P
2 a 2 S
3 b 3 P
4 b 4 S
我想保留 y 的 'lead' 值为 'S' 的行,这样我得到的数据框将是:
id x y
1 a 1 P
2 b 3 P
我可以使用 PySpark 完成以下操作:
getLeadPoint = udf(lambda x: 'S' if (y == 'S') else 'NOTS', StringType())
windowSpec = Window.partitionBy(df['id'])
df = df.withColumn('lead_point', getLeadPoint(lead(df.y).over(windowSpec)))
dfNew = df.filter(df.lead_point == 'S')
但是,在这里,我改变了一个不必要的列然后过滤。
我想做的是使用 lead() 进行过滤,但无法正常工作:
dfNew = df.filter(lead(df.y).over(windowSpec) == 'S')
关于如何通过使用窗口的直接过滤器实现结果有什么想法吗?
R 等价于:
library(dplyr)
df %>% group_by(id) %>% filter(lead(y) == 'S')
效率不高,但你可以用索引压缩,然后创建一个新的 RDD,在其中将索引加 1,然后加入索引,然后它变成一个简单的过滤操作。
假设您的数据如下所示:
df = sc.parallelize([
("a", 1, 1, "P"), ("a", 2, 2, "S"),
("b", 4, 2, "S"), ("b", 3, 1, "P"), ("b", 2, 3, "P"), ("b", 3, 3, "S")
]).toDF(["id", "x", "timestamp", "y"])
和window规范等同于
from pyspark.sql.functions import lead, col
from pyspark.sql import Window
w = Window.partitionBy("id").orderBy("timestamp")
您可以简单地添加列并将其用于过滤:
(df
.withColumn("lead_y", lead("y").over(w))
.where(col("lead_y") == "S").drop("lead_y"))
它不是很漂亮,但会比 UDF 调用更有效。
我有这样一个数据框:
id x y
1 a 1 P
2 a 2 S
3 b 3 P
4 b 4 S
我想保留 y 的 'lead' 值为 'S' 的行,这样我得到的数据框将是:
id x y
1 a 1 P
2 b 3 P
我可以使用 PySpark 完成以下操作:
getLeadPoint = udf(lambda x: 'S' if (y == 'S') else 'NOTS', StringType())
windowSpec = Window.partitionBy(df['id'])
df = df.withColumn('lead_point', getLeadPoint(lead(df.y).over(windowSpec)))
dfNew = df.filter(df.lead_point == 'S')
但是,在这里,我改变了一个不必要的列然后过滤。
我想做的是使用 lead() 进行过滤,但无法正常工作:
dfNew = df.filter(lead(df.y).over(windowSpec) == 'S')
关于如何通过使用窗口的直接过滤器实现结果有什么想法吗?
R 等价于:
library(dplyr)
df %>% group_by(id) %>% filter(lead(y) == 'S')
效率不高,但你可以用索引压缩,然后创建一个新的 RDD,在其中将索引加 1,然后加入索引,然后它变成一个简单的过滤操作。
假设您的数据如下所示:
df = sc.parallelize([
("a", 1, 1, "P"), ("a", 2, 2, "S"),
("b", 4, 2, "S"), ("b", 3, 1, "P"), ("b", 2, 3, "P"), ("b", 3, 3, "S")
]).toDF(["id", "x", "timestamp", "y"])
和window规范等同于
from pyspark.sql.functions import lead, col
from pyspark.sql import Window
w = Window.partitionBy("id").orderBy("timestamp")
您可以简单地添加列并将其用于过滤:
(df
.withColumn("lead_y", lead("y").over(w))
.where(col("lead_y") == "S").drop("lead_y"))
它不是很漂亮,但会比 UDF 调用更有效。