Pyspark:如何过滤两个列值对的列表?
Pyspark: How to filter on list of two column value pairs?
所以我有一个 PySpark 数据框,我想用两列的有效 对 的(长)列表进行过滤。
假设我们的数据框名称是 df
并且列 col1
和 col2
:
col1 col2
1 A
2 B
3 1
null 2
A null
2 null
1 null
B C
我的有效配对列表为:flist=[(1,A), (null,2), (1,null)]
当我尝试使用 .isin()
函数(如下所示)时,它告诉我 .isin()
不适用于元组。
df.filter((df["col1"],df["col2"]).isin(flist))
通过连接两个字符串或为每对写下一个布尔表达式,已经有解决方法,但我有一长串有效对(很难变成布尔值)并且连接也不可靠,因为空值。使用 Python (df['col1'],df['col2']) in flist
也不起作用。
有 Pythonic/PySparkic 方法吗?
您可以使用列表创建 filder_df
并进行连接:
flist = [("1", "A"), (None, "2"), ("1", None)]
filter_df = spark.createDataFrame(flist, ["col1", "col2"])
df1 = df.join(filter_df, ["col1", "col2"])
df1.show()
#+----+----+
#|col1|col2|
#+----+----+
#| 1| A|
#+----+----+
请注意,您无法比较空值。所以这里只返回元组 ("1", "A")
的行。要检查空值,您需要在列上使用 isNull()
:
df1 = df.alias("df").join(
filter_df.alias("fdf"),
((F.col("df.col1") == F.col("fdf.col1")) |
(col("df.col1").isNull() & F.col("fdf.col1").isNull())
) &
((F.col("df.col2") == F.col("fdf.col2")) |
(col("df.col2").isNull() & F.col("fdf.col2").isNull())
)
).select("df.*")
df1.show()
#+----+----+
#|col1|col2|
#+----+----+
#| 1| A|
#|null| 2|
#| 1|null|
#+----+----+
或者按照@Chris 的回答中的建议更好地使用 eqNullSafe
。
这是一种无需加入的方法,您可以在过滤器中链接一系列条件,以便将每一行与 flist
中的值进行比较。它可以处理空值。
from functools import reduce
import pyspark.sql.functions as F
flist = [(1, 'A'), (None, 2), (1, None)]
df2 = df.filter(
reduce(
lambda x, y: x | y,
[
((F.col('col1') == col1) if col1 is not None else F.col('col1').isNull()) &
((F.col('col2') == col2) if col2 is not None else F.col('col2').isNull())
for (col1, col2) in flist
]
)
)
df2.show()
+----+----+
|col1|col2|
+----+----+
| 1| A|
|null| 2|
| 1|null|
+----+----+
基于@blackbishop 根据过滤条件创建 Dataframe 并加入的方法,您可以使用 Column.eqNullSafe
方法安全地比较空值:
df = spark.createDataFrame(
[('1', 'A', 1),
('2', 'B', 2),
('3', '1', 3),
(None, '2', 4),
('A', None, 5),
('2', None, 6),
('1', None, 7),
('B', 'C', 8)], schema=['col1', 'col2', 'col3'])
flist = [("1", "A"), (None, "2"), ("1", None)]
filter_df = spark.createDataFrame(flist, ["col1", "col2"])
(df.join(filter_df,
df["col1"].eqNullSafe(filter_df["col1"]) &
df["col2"].eqNullSafe(filter_df['col2']))
.select(df['col1'], df['col2'], df['col3'])
.show())
给出:
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1|null| 7|
|null| 2| 4|
| 1| A| 1|
+----+----+----+
请注意,如果您的 'filter' Dataframe 包含唯一行,则连接仅充当过滤器。您可以在连接之前在该 Dataframe 上添加一个 distinct
以确保(例如,如果您的过滤条件很大)。
所以我有一个 PySpark 数据框,我想用两列的有效 对 的(长)列表进行过滤。
假设我们的数据框名称是 df
并且列 col1
和 col2
:
col1 col2
1 A
2 B
3 1
null 2
A null
2 null
1 null
B C
我的有效配对列表为:flist=[(1,A), (null,2), (1,null)]
当我尝试使用 .isin()
函数(如下所示)时,它告诉我 .isin()
不适用于元组。
df.filter((df["col1"],df["col2"]).isin(flist))
通过连接两个字符串或为每对写下一个布尔表达式,已经有解决方法,但我有一长串有效对(很难变成布尔值)并且连接也不可靠,因为空值。使用 Python (df['col1'],df['col2']) in flist
也不起作用。
有 Pythonic/PySparkic 方法吗?
您可以使用列表创建 filder_df
并进行连接:
flist = [("1", "A"), (None, "2"), ("1", None)]
filter_df = spark.createDataFrame(flist, ["col1", "col2"])
df1 = df.join(filter_df, ["col1", "col2"])
df1.show()
#+----+----+
#|col1|col2|
#+----+----+
#| 1| A|
#+----+----+
请注意,您无法比较空值。所以这里只返回元组 ("1", "A")
的行。要检查空值,您需要在列上使用 isNull()
:
df1 = df.alias("df").join(
filter_df.alias("fdf"),
((F.col("df.col1") == F.col("fdf.col1")) |
(col("df.col1").isNull() & F.col("fdf.col1").isNull())
) &
((F.col("df.col2") == F.col("fdf.col2")) |
(col("df.col2").isNull() & F.col("fdf.col2").isNull())
)
).select("df.*")
df1.show()
#+----+----+
#|col1|col2|
#+----+----+
#| 1| A|
#|null| 2|
#| 1|null|
#+----+----+
或者按照@Chris 的回答中的建议更好地使用 eqNullSafe
。
这是一种无需加入的方法,您可以在过滤器中链接一系列条件,以便将每一行与 flist
中的值进行比较。它可以处理空值。
from functools import reduce
import pyspark.sql.functions as F
flist = [(1, 'A'), (None, 2), (1, None)]
df2 = df.filter(
reduce(
lambda x, y: x | y,
[
((F.col('col1') == col1) if col1 is not None else F.col('col1').isNull()) &
((F.col('col2') == col2) if col2 is not None else F.col('col2').isNull())
for (col1, col2) in flist
]
)
)
df2.show()
+----+----+
|col1|col2|
+----+----+
| 1| A|
|null| 2|
| 1|null|
+----+----+
基于@blackbishop 根据过滤条件创建 Dataframe 并加入的方法,您可以使用 Column.eqNullSafe
方法安全地比较空值:
df = spark.createDataFrame(
[('1', 'A', 1),
('2', 'B', 2),
('3', '1', 3),
(None, '2', 4),
('A', None, 5),
('2', None, 6),
('1', None, 7),
('B', 'C', 8)], schema=['col1', 'col2', 'col3'])
flist = [("1", "A"), (None, "2"), ("1", None)]
filter_df = spark.createDataFrame(flist, ["col1", "col2"])
(df.join(filter_df,
df["col1"].eqNullSafe(filter_df["col1"]) &
df["col2"].eqNullSafe(filter_df['col2']))
.select(df['col1'], df['col2'], df['col3'])
.show())
给出:
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1|null| 7|
|null| 2| 4|
| 1| A| 1|
+----+----+----+
请注意,如果您的 'filter' Dataframe 包含唯一行,则连接仅充当过滤器。您可以在连接之前在该 Dataframe 上添加一个 distinct
以确保(例如,如果您的过滤条件很大)。