如何根据另一个数据框过滤数据框?

how to filter a dataframe based on another dataframe?

我在 pyspark 中有两个 Dataframe:

d1: (x,y,value) 和 d2:(k,v, value)。 d1 中的条目是唯一的(你可以认为列 x 是唯一的,而 y 单独作为键)

x y value
a b 0.2
c d 0.4
e f 0,8

d2 格式如下:

k v value
a c 0.7
k k 0.3
j h 0.8
e p 0.1
a b 0.1

我需要根据 d1 上的共现来过滤 d2。即,a , c 0.7 and e p 0.1 应该被删除,因为 a 只能出现在 b 中,类似地 e.

我尝试 select 从 d1 的 x 和 y 列。

sourceList = df1.select("x").collect()
sourceList = [row.x for row in sourceList]

sourceList_b = sc.broadcast(sourceList)

然后

check_id_isin = sf.udf(lambda x: x in sourceList , BooleanType())
d2 = d2.where(~d2.k.isin(sourceList_b.value))

对于小型数据集,它运行良好,但对于大型数据集,收集会导致异常。我想知道是否有更好的逻辑来计算这一步。

您可能想要做的是从关系的角度来考虑这个问题。在 d1.x = d2.k AND d1.y = d2.kv 上加入 d1 和 d2。内部联接将删除 D2 中在 d1 中没有对应对的任何记录。通过加入一个加入,spark 将在集群范围内对数据进行洗牌,与广播交换相比,可以实现更大的并行性和可扩展性,广播交换通常最多约 10mb 的数据(这是 spark 用作洗牌之间的切入点)加入和广播加入。

此外,在大多数情况下,WHERE (a,b) IS IN (...) 会被转换为连接,除非 (...) 是一小组数据。

https://github.com/vaquarkhan/vaquarkhan/wiki/Apache-Spark--Shuffle-hash-join-vs--Broadcast-hash-join

一种方法可以是join d1到d2,然后使用coalesce从v列中填充y列中的缺失值,然后过滤y和v不同的行,例如:

import pyspark.sql.functions as F

(d2.join( d1.select('x','y').withColumnRenamed('x','k'), #rename x to k for easier join
          on=['k'], how='left') #join left to keep only d2 rows
   .withColumn('y', F.coalesce('y', 'v')) #fill the value missing in y with the one from v
   .filter((F.col('v') == F.col('y'))) #keep only where the value in v are equal to y
   .drop('y').show()) #drop the column y not necessary

你得到:

+---+---+-----+
|  k|  v|value|
+---+---+-----+
|  k|  k|  0.3|
|  j|  h|  0.8|
+---+---+-----+

并且还应该保留任何行,其中一对 (x,y) 中的两个值都在 (k,v)

所以你这里有两个问题:

  1. 连接这两个表的逻辑:

这可以通过对两列而不是一列执行内部联接来完成。这是代码:

# Create an expression wherein you do an inner join on two cols
joinExpr = ((d1.x = d2.k) & (d1.y == d2.y))
joinDF = d1.join(d2, joinExpr)
  1. 第二个问题是速度。有多种修复方法。这是我的前两个:

一个。如果其中一个数据帧明显小于另一个数据帧(通常小于 2 GB),则可以使用广播连接。它本质上是将较小的数据帧复制给所有工作人员,因此在加入时无需洗牌。这是一个例子:

from pyspark.sql.functions import broadcast

joinExpr = ((d1.x = d2.k) & (d1.y == d2.y))
joinDF = d1.join(broadcast(d2), joinExpr)

b。尝试添加更多工作人员并增加内存。