过滤行在另一个 DataFrame 范围内的 pyspark DataFrame
Filtering a pyspark DataFrame where rows are within a range of another DataFrame
我想从一个 DataFrame (df1
) 中检索所有行,使其 id
在另一个 DataFrame (df2
).
示例:
df1.show()
#+-----+---+
#| word| id|
#+-----+---+
#|apple| 10|
#| cat| 30|
#+-----+---+
df2.show()
#+----+---+
#|word| id|
#+----+---+
#|some| 50|
#|jeff| 3|
#| etc|100|
#+----+---+
期望的结果:
+-----+---+
| word| id|
+-----+---+
|apple| 10|
+-----+---+
这是因为 "apple"
在 "jeff"
的 10 以内。
如您所见,如果 df1
中的 id
满足 df2
中任何 id
的条件,则行是好的。两个 DataFrame 的长度也不一定相同。
我已经很清楚如何为精确匹配执行 isin
或 antijoin
之类的操作,但我不清楚这种更宽松的情况。
编辑:我的一个新想法是,如果没有预构建或干净的方法来执行此操作,则可能支持基于已定义函数的复杂过滤(如果它们是可并行化的)。如果我找到那个方向的方法,我将开始 google 追踪并更新。
编辑:到目前为止,我偶然发现了 udf
函数,但我还没有设法让它工作。我想我需要让它以某种方式接受一列而不是单个数字。这是我目前所拥有的..
columns = ['word', 'id']
vals = [
("apple",10),
("cat",30)
]
df1 = sqlContext.createDataFrame(vals, columns)
vals = [
("some",50),
("jeff",3),
("etc",100)
]
df2 = sqlContext.createDataFrame(vals, columns)
def inRange(id1,id2,delta):
id1 = int(id1)
id2 = int(id2)
return id1>=id2-delta and id1<=id2+delta
inRangeUDF = udf(inRange,BooleanType())
df1.filter(inRangeUDF(df1.id,df2.id, 10)).show()
这当前引发错误
TypeError: Invalid argument, not a string or column: 10 of type <class 'int'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
您不能将 DataFrame 传递给 udf
。完成此操作的自然方法是使用 join
:
import pyspark.sql.functions as f
df1.alias('l').join(
df2.alias('r'),
on=f.abs(f.col('l.id') - f.col('r.id')) <= 10
).select('l.*').show()
#+-----+---+
#| word| id|
#+-----+---+
#|apple| 10|
#+-----+---+
我使用 alias
来避免在指定 DataFrame 列名称时出现歧义。这会将 df1
连接到 df2
,其中 df1.id
和 df2.id
之间的差值的绝对值小于或等于 10,并且仅选择 [=14= 中的列].
我想从一个 DataFrame (df1
) 中检索所有行,使其 id
在另一个 DataFrame (df2
).
示例:
df1.show()
#+-----+---+
#| word| id|
#+-----+---+
#|apple| 10|
#| cat| 30|
#+-----+---+
df2.show()
#+----+---+
#|word| id|
#+----+---+
#|some| 50|
#|jeff| 3|
#| etc|100|
#+----+---+
期望的结果:
+-----+---+
| word| id|
+-----+---+
|apple| 10|
+-----+---+
这是因为 "apple"
在 "jeff"
的 10 以内。
如您所见,如果 df1
中的 id
满足 df2
中任何 id
的条件,则行是好的。两个 DataFrame 的长度也不一定相同。
我已经很清楚如何为精确匹配执行 isin
或 antijoin
之类的操作,但我不清楚这种更宽松的情况。
编辑:我的一个新想法是,如果没有预构建或干净的方法来执行此操作,则可能支持基于已定义函数的复杂过滤(如果它们是可并行化的)。如果我找到那个方向的方法,我将开始 google 追踪并更新。
编辑:到目前为止,我偶然发现了 udf
函数,但我还没有设法让它工作。我想我需要让它以某种方式接受一列而不是单个数字。这是我目前所拥有的..
columns = ['word', 'id']
vals = [
("apple",10),
("cat",30)
]
df1 = sqlContext.createDataFrame(vals, columns)
vals = [
("some",50),
("jeff",3),
("etc",100)
]
df2 = sqlContext.createDataFrame(vals, columns)
def inRange(id1,id2,delta):
id1 = int(id1)
id2 = int(id2)
return id1>=id2-delta and id1<=id2+delta
inRangeUDF = udf(inRange,BooleanType())
df1.filter(inRangeUDF(df1.id,df2.id, 10)).show()
这当前引发错误
TypeError: Invalid argument, not a string or column: 10 of type <class 'int'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
您不能将 DataFrame 传递给 udf
。完成此操作的自然方法是使用 join
:
import pyspark.sql.functions as f
df1.alias('l').join(
df2.alias('r'),
on=f.abs(f.col('l.id') - f.col('r.id')) <= 10
).select('l.*').show()
#+-----+---+
#| word| id|
#+-----+---+
#|apple| 10|
#+-----+---+
我使用 alias
来避免在指定 DataFrame 列名称时出现歧义。这会将 df1
连接到 df2
,其中 df1.id
和 df2.id
之间的差值的绝对值小于或等于 10,并且仅选择 [=14= 中的列].