Pyspark:如何过滤两个列值对的列表?

Pyspark: How to filter on list of two column value pairs?

所以我有一个 PySpark 数据框,我想用两列的有效 的(长)列表进行过滤。

假设我们的数据框名称是 df 并且列 col1col2:

col1   col2
1      A
2      B
3      1
null   2
A      null
2      null
1      null
B      C

我的有效配对列表为:flist=[(1,A), (null,2), (1,null)]

当我尝试使用 .isin() 函数(如下所示)时,它告诉我 .isin() 不适用于元组。

df.filter((df["col1"],df["col2"]).isin(flist))

通过连接两个字符串或为每对写下一个布尔表达式,已经有解决方法,但我有一长串有效对(很难变成布尔值)并且连接也不可靠,因为空值。使用 Python (df['col1'],df['col2']) in flist 也不起作用。

有 Pythonic/PySparkic 方法吗?

您可以使用列表创建 filder_df 并进行连接:

flist = [("1", "A"), (None, "2"), ("1", None)]
filter_df = spark.createDataFrame(flist, ["col1", "col2"])

df1 = df.join(filter_df, ["col1", "col2"])

df1.show()
#+----+----+
#|col1|col2|
#+----+----+
#|   1|   A|
#+----+----+

请注意,您无法比较空值。所以这里只返回元组 ("1", "A") 的行。要检查空值,您需要在列上使用 isNull() :

df1 = df.alias("df").join(
    filter_df.alias("fdf"),
    ((F.col("df.col1") == F.col("fdf.col1")) |
     (col("df.col1").isNull() & F.col("fdf.col1").isNull())
     ) &
    ((F.col("df.col2") == F.col("fdf.col2")) |
     (col("df.col2").isNull() & F.col("fdf.col2").isNull())
     )
).select("df.*")

df1.show()

#+----+----+
#|col1|col2|
#+----+----+
#|   1|   A|
#|null|   2|
#|   1|null|
#+----+----+

或者按照@Chris 的回答中的建议更好地使用 eqNullSafe

这是一种无需加入的方法,您可以在过滤器中链接一系列条件,以便将每一行与 flist 中的值进行比较。它可以处理空值。

from functools import reduce
import pyspark.sql.functions as F

flist = [(1, 'A'), (None, 2), (1, None)] 

df2 = df.filter(
    reduce(
        lambda x, y: x | y, 
        [ 
            ((F.col('col1') == col1) if col1 is not None else F.col('col1').isNull()) & 
            ((F.col('col2') == col2) if col2 is not None else F.col('col2').isNull())
            for (col1, col2) in flist
        ]
    )
)

df2.show()
+----+----+
|col1|col2|
+----+----+
|   1|   A|
|null|   2|
|   1|null|
+----+----+

基于@blackbishop 根据过滤条件创建 Dataframe 并加入的方法,您可以使用 Column.eqNullSafe 方法安全地比较空值:

df = spark.createDataFrame(
    [('1', 'A', 1),
     ('2', 'B', 2),
     ('3', '1', 3),
     (None, '2', 4),
     ('A', None, 5),
     ('2', None, 6),
     ('1', None, 7),
     ('B', 'C', 8)], schema=['col1', 'col2', 'col3'])

flist = [("1", "A"), (None, "2"), ("1", None)]
filter_df = spark.createDataFrame(flist, ["col1", "col2"])

(df.join(filter_df,
         df["col1"].eqNullSafe(filter_df["col1"]) &
         df["col2"].eqNullSafe(filter_df['col2']))
 .select(df['col1'], df['col2'], df['col3'])
 .show())

给出:

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|null|   7|
|null|   2|   4|
|   1|   A|   1|
+----+----+----+

请注意,如果您的 'filter' Dataframe 包含唯一行,则连接仅充当过滤器。您可以在连接之前在该 Dataframe 上添加一个 distinct 以确保(例如,如果您的过滤条件很大)。