过滤行在另一个 DataFrame 范围内的 pyspark DataFrame

Filtering a pyspark DataFrame where rows are within a range of another DataFrame

我想从一个 DataFrame (df1) 中检索所有行,使其 id 在另一个 DataFrame (df2).

示例:

df1.show()
#+-----+---+
#| word| id|
#+-----+---+
#|apple| 10|
#|  cat| 30|
#+-----+---+ 

df2.show()
#+----+---+
#|word| id|
#+----+---+
#|some| 50|
#|jeff|  3|
#| etc|100|
#+----+---+

期望的结果:

+-----+---+
| word| id|
+-----+---+
|apple| 10|
+-----+---+

这是因为 "apple""jeff" 的 10 以内。

如您所见,如果 df1 中的 id 满足 df2 中任何 id 的条件,则行是好的。两个 DataFrame 的长度也不一定相同。

我已经很清楚如何为精确匹配执行 isinantijoin 之类的操作,但我不清楚这种更宽松的情况。

编辑:我的一个新想法是,如果没有预构建或干净的方法来执行此操作,则可能支持基于已定义函数的复杂过滤(如果它们是可并行化的)。如果我找到那个方向的方法,我将开始 google 追踪并更新。

编辑:到目前为止,我偶然发现了 udf 函数,但我还没有设法让它工作。我想我需要让它以某种方式接受一列而不是单个数字。这是我目前所拥有的..

columns = ['word', 'id']
vals = [
     ("apple",10),
     ("cat",30)
]

df1 = sqlContext.createDataFrame(vals, columns)

vals = [
     ("some",50),
     ("jeff",3),
     ("etc",100)
]

df2 = sqlContext.createDataFrame(vals, columns)

def inRange(id1,id2,delta):
    id1 = int(id1)
    id2 = int(id2)
    return id1>=id2-delta and id1<=id2+delta
inRangeUDF = udf(inRange,BooleanType())

df1.filter(inRangeUDF(df1.id,df2.id, 10)).show()

这当前引发错误

TypeError: Invalid argument, not a string or column: 10 of type <class 'int'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

您不能将 DataFrame 传递给 udf。完成此操作的自然方法是使用 join:

import pyspark.sql.functions as f

df1.alias('l').join(
    df2.alias('r'), 
    on=f.abs(f.col('l.id') - f.col('r.id')) <= 10
).select('l.*').show()
#+-----+---+
#| word| id|
#+-----+---+
#|apple| 10|
#+-----+---+

我使用 alias 来避免在指定 DataFrame 列名称时出现歧义。这会将 df1 连接到 df2,其中 df1.iddf2.id 之间的差值的绝对值小于或等于 10,并且仅选择 [=14= 中的列].