如何根据 window 和 pyspark 中的条件过滤行?
How to filter rows based on window and a condition in pyspark?
我需要删除相同 ID p_id 和 key_id 缺少反馈的行,但我们确实有一些反馈。
输入
id p_id key_id feedback
1 p1 k1 happy
1 p1 k1 sad
1 p1 k2 sad
1 p1 k2
1 p2 k3
2 p1 k3 sad
输出
id p_id key_id feedback
1 p1 k1 happy
1 p1 k1 sad
1 p1 k2 sad
1 p2 k3
2 p1 k3 sad
我如何在 pyspark 中实现它?
我会创建一个名为 min_length
的新列,并按该列和 feedback
列进行过滤:
import pyspark.sql.functions as F
import pyspark.sql.window.Window as W
df = df.withColumn('min_length',
F.min(F.length(F.trim(F.col('feedback'))))
.over(W.partitionBy('id', 'p_id', 'key_id'))
)
cond = (F.col('min_length') != 0) & (F.length(F.trim(F.col('feedback'))) == 0)
df.filter(~cond)
修剪只是去除了 feedback
列中的所有空格
您可以为每个键 ([id, p_id, key_id]) 计算您在 DataFrame 中对该键的反馈数量。
然后你可以过滤你的 DataFrame 只保留你有反馈的行(feedback 不是 Null)或者你没有对该特定键的任何反馈。
代码示例如下:
key = ['id', 'p_id', 'key_id']
num_feedbacks = df.filter(col('feedback')!="")\
.groupby(key).agg(F.count('feedback').alias('num_feedbacks'))
df = df.join(num_feedbacks, on=key, how='left')\
.filter((col('feedback')!="") | (col('num_feedbacks').isNull()))\
.drop('num_feedbacks')
这给你:
+---+----+------+--------+
| id|p_id|key_id|feedback|
+---+----+------+--------+
| 2| p1| k3| sad|
| 1| p1| k1| sad|
| 1| p1| k1| happy|
| 1| p1| k2| sad|
| 1| p2| k3| |
+---+----+------+--------+
我需要删除相同 ID p_id 和 key_id 缺少反馈的行,但我们确实有一些反馈。
输入
id p_id key_id feedback
1 p1 k1 happy
1 p1 k1 sad
1 p1 k2 sad
1 p1 k2
1 p2 k3
2 p1 k3 sad
输出
id p_id key_id feedback
1 p1 k1 happy
1 p1 k1 sad
1 p1 k2 sad
1 p2 k3
2 p1 k3 sad
我如何在 pyspark 中实现它?
我会创建一个名为 min_length
的新列,并按该列和 feedback
列进行过滤:
import pyspark.sql.functions as F
import pyspark.sql.window.Window as W
df = df.withColumn('min_length',
F.min(F.length(F.trim(F.col('feedback'))))
.over(W.partitionBy('id', 'p_id', 'key_id'))
)
cond = (F.col('min_length') != 0) & (F.length(F.trim(F.col('feedback'))) == 0)
df.filter(~cond)
修剪只是去除了 feedback
列中的所有空格
您可以为每个键 ([id, p_id, key_id]) 计算您在 DataFrame 中对该键的反馈数量。 然后你可以过滤你的 DataFrame 只保留你有反馈的行(feedback 不是 Null)或者你没有对该特定键的任何反馈。
代码示例如下:
key = ['id', 'p_id', 'key_id']
num_feedbacks = df.filter(col('feedback')!="")\
.groupby(key).agg(F.count('feedback').alias('num_feedbacks'))
df = df.join(num_feedbacks, on=key, how='left')\
.filter((col('feedback')!="") | (col('num_feedbacks').isNull()))\
.drop('num_feedbacks')
这给你:
+---+----+------+--------+
| id|p_id|key_id|feedback|
+---+----+------+--------+
| 2| p1| k3| sad|
| 1| p1| k1| sad|
| 1| p1| k1| happy|
| 1| p1| k2| sad|
| 1| p2| k3| |
+---+----+------+--------+