如何根据另一个数据框过滤数据框?
how to filter a dataframe based on another dataframe?
我在 pyspark 中有两个 Dataframe:
d1: (x,y,value)
和 d2:(k,v, value)
。 d1 中的条目是唯一的(你可以认为列 x 是唯一的,而 y 单独作为键)
x y value
a b 0.2
c d 0.4
e f 0,8
d2 格式如下:
k v value
a c 0.7
k k 0.3
j h 0.8
e p 0.1
a b 0.1
我需要根据 d1 上的共现来过滤 d2。即,a , c 0.7 and e p 0.1
应该被删除,因为 a
只能出现在 b
中,类似地 e
.
我尝试 select 从 d1 的 x 和 y 列。
sourceList = df1.select("x").collect()
sourceList = [row.x for row in sourceList]
sourceList_b = sc.broadcast(sourceList)
然后
check_id_isin = sf.udf(lambda x: x in sourceList , BooleanType())
d2 = d2.where(~d2.k.isin(sourceList_b.value))
对于小型数据集,它运行良好,但对于大型数据集,收集会导致异常。我想知道是否有更好的逻辑来计算这一步。
您可能想要做的是从关系的角度来考虑这个问题。在 d1.x = d2.k AND d1.y = d2.kv 上加入 d1 和 d2。内部联接将删除 D2 中在 d1 中没有对应对的任何记录。通过加入一个加入,spark 将在集群范围内对数据进行洗牌,与广播交换相比,可以实现更大的并行性和可扩展性,广播交换通常最多约 10mb 的数据(这是 spark 用作洗牌之间的切入点)加入和广播加入。
此外,在大多数情况下,WHERE (a,b) IS IN (...) 会被转换为连接,除非 (...) 是一小组数据。
一种方法可以是join
d1到d2,然后使用coalesce从v列中填充y列中的缺失值,然后过滤y和v不同的行,例如:
import pyspark.sql.functions as F
(d2.join( d1.select('x','y').withColumnRenamed('x','k'), #rename x to k for easier join
on=['k'], how='left') #join left to keep only d2 rows
.withColumn('y', F.coalesce('y', 'v')) #fill the value missing in y with the one from v
.filter((F.col('v') == F.col('y'))) #keep only where the value in v are equal to y
.drop('y').show()) #drop the column y not necessary
你得到:
+---+---+-----+
| k| v|value|
+---+---+-----+
| k| k| 0.3|
| j| h| 0.8|
+---+---+-----+
并且还应该保留任何行,其中一对 (x,y) 中的两个值都在 (k,v)
所以你这里有两个问题:
- 连接这两个表的逻辑:
这可以通过对两列而不是一列执行内部联接来完成。这是代码:
# Create an expression wherein you do an inner join on two cols
joinExpr = ((d1.x = d2.k) & (d1.y == d2.y))
joinDF = d1.join(d2, joinExpr)
- 第二个问题是速度。有多种修复方法。这是我的前两个:
一个。如果其中一个数据帧明显小于另一个数据帧(通常小于 2 GB),则可以使用广播连接。它本质上是将较小的数据帧复制给所有工作人员,因此在加入时无需洗牌。这是一个例子:
from pyspark.sql.functions import broadcast
joinExpr = ((d1.x = d2.k) & (d1.y == d2.y))
joinDF = d1.join(broadcast(d2), joinExpr)
b。尝试添加更多工作人员并增加内存。
我在 pyspark 中有两个 Dataframe:
d1: (x,y,value)
和 d2:(k,v, value)
。 d1 中的条目是唯一的(你可以认为列 x 是唯一的,而 y 单独作为键)
x y value
a b 0.2
c d 0.4
e f 0,8
d2 格式如下:
k v value
a c 0.7
k k 0.3
j h 0.8
e p 0.1
a b 0.1
我需要根据 d1 上的共现来过滤 d2。即,a , c 0.7 and e p 0.1
应该被删除,因为 a
只能出现在 b
中,类似地 e
.
我尝试 select 从 d1 的 x 和 y 列。
sourceList = df1.select("x").collect()
sourceList = [row.x for row in sourceList]
sourceList_b = sc.broadcast(sourceList)
然后
check_id_isin = sf.udf(lambda x: x in sourceList , BooleanType())
d2 = d2.where(~d2.k.isin(sourceList_b.value))
对于小型数据集,它运行良好,但对于大型数据集,收集会导致异常。我想知道是否有更好的逻辑来计算这一步。
您可能想要做的是从关系的角度来考虑这个问题。在 d1.x = d2.k AND d1.y = d2.kv 上加入 d1 和 d2。内部联接将删除 D2 中在 d1 中没有对应对的任何记录。通过加入一个加入,spark 将在集群范围内对数据进行洗牌,与广播交换相比,可以实现更大的并行性和可扩展性,广播交换通常最多约 10mb 的数据(这是 spark 用作洗牌之间的切入点)加入和广播加入。
此外,在大多数情况下,WHERE (a,b) IS IN (...) 会被转换为连接,除非 (...) 是一小组数据。
一种方法可以是join
d1到d2,然后使用coalesce从v列中填充y列中的缺失值,然后过滤y和v不同的行,例如:
import pyspark.sql.functions as F
(d2.join( d1.select('x','y').withColumnRenamed('x','k'), #rename x to k for easier join
on=['k'], how='left') #join left to keep only d2 rows
.withColumn('y', F.coalesce('y', 'v')) #fill the value missing in y with the one from v
.filter((F.col('v') == F.col('y'))) #keep only where the value in v are equal to y
.drop('y').show()) #drop the column y not necessary
你得到:
+---+---+-----+
| k| v|value|
+---+---+-----+
| k| k| 0.3|
| j| h| 0.8|
+---+---+-----+
并且还应该保留任何行,其中一对 (x,y) 中的两个值都在 (k,v)
所以你这里有两个问题:
- 连接这两个表的逻辑:
这可以通过对两列而不是一列执行内部联接来完成。这是代码:
# Create an expression wherein you do an inner join on two cols
joinExpr = ((d1.x = d2.k) & (d1.y == d2.y))
joinDF = d1.join(d2, joinExpr)
- 第二个问题是速度。有多种修复方法。这是我的前两个:
一个。如果其中一个数据帧明显小于另一个数据帧(通常小于 2 GB),则可以使用广播连接。它本质上是将较小的数据帧复制给所有工作人员,因此在加入时无需洗牌。这是一个例子:
from pyspark.sql.functions import broadcast
joinExpr = ((d1.x = d2.k) & (d1.y == d2.y))
joinDF = d1.join(broadcast(d2), joinExpr)
b。尝试添加更多工作人员并增加内存。