PySpark - 一种更有效的计算公共元素的方法
PySpark - A more efficient method to count common elements
我有两个数据帧,比如 dfA
和 dfB
。
我想取他们的交叉点,然后计算该交叉点中唯一 user_ids
的数量。
我试过以下非常慢而且经常崩溃的方法:
dfA.join(broadcast(dfB), ['user_id'], how='inner').select('user_id').dropDuplicates().count()
我需要 运行 很多这样的台词,才能得到一个情节。
如何高效地执行此类查询?
如问题中所述,数据框唯一相关的部分是 user_id
列(在您的问题中,您描述了您加入 user_id
之后仅使用 user_id
字段)
当您只需要每个数据帧中一列的不同值时,性能问题的根源在于连接两个大数据帧。
为了提高性能,我将执行以下操作:
创建两个仅包含每个数据帧的 user_id
列的小型 DF
这将大大减少每个数据框的大小,因为它只包含一列(唯一相关的列)
dfAuserid = dfA.select("user_id")
dfBuserid = dfB.select("user_id")
得到distinct
(注:相当于每个dataframe的dropDuplicate()
值
这将大大减少每个数据帧的大小,因为每个新数据帧将仅包含 user_id
.
列的不同值
dfAuseridDist = dfA.select("user_id").distinct()
dfBuseridDist = dfB.select("user_id").distinct()
对上述两个极简数据帧执行join
以获得交集的唯一值
我认为您可以先 select 必要的列,然后再执行连接。在连接之前移动 dropDuplicates 也应该是有益的,因为这样你就可以摆脱在其中一个数据帧中多次出现的 user_ids。
生成的查询可能如下所示:
dfA.select("user_id").join(broadcast(dfB.select("user_id")), ['user_id'], how='inner')\
.select('user_id').dropDuplicates().count()
或:
dfA.select("user_id").dropDuplicates(["user_id",]).join(broadcast(dfB.select("user_id")\
.dropDuplicates(["user_id",])), ['user_id'], how='inner').select('user_id').count()
或者带有 distinct 的版本也应该可以工作。
dfA.select("user_id").distinct().join(broadcast(dfB.select("user_id").distinct()),\
['user_id'], how='inner').select('user_id').count()
我有两个数据帧,比如 dfA
和 dfB
。
我想取他们的交叉点,然后计算该交叉点中唯一 user_ids
的数量。
我试过以下非常慢而且经常崩溃的方法:
dfA.join(broadcast(dfB), ['user_id'], how='inner').select('user_id').dropDuplicates().count()
我需要 运行 很多这样的台词,才能得到一个情节。
如何高效地执行此类查询?
如问题中所述,数据框唯一相关的部分是 user_id
列(在您的问题中,您描述了您加入 user_id
之后仅使用 user_id
字段)
当您只需要每个数据帧中一列的不同值时,性能问题的根源在于连接两个大数据帧。
为了提高性能,我将执行以下操作:
创建两个仅包含每个数据帧的
user_id
列的小型 DF
这将大大减少每个数据框的大小,因为它只包含一列(唯一相关的列)dfAuserid = dfA.select("user_id") dfBuserid = dfB.select("user_id")
得到
列的不同值distinct
(注:相当于每个dataframe的dropDuplicate()
值
这将大大减少每个数据帧的大小,因为每个新数据帧将仅包含user_id
.dfAuseridDist = dfA.select("user_id").distinct() dfBuseridDist = dfB.select("user_id").distinct()
对上述两个极简数据帧执行
join
以获得交集的唯一值
我认为您可以先 select 必要的列,然后再执行连接。在连接之前移动 dropDuplicates 也应该是有益的,因为这样你就可以摆脱在其中一个数据帧中多次出现的 user_ids。
生成的查询可能如下所示:
dfA.select("user_id").join(broadcast(dfB.select("user_id")), ['user_id'], how='inner')\
.select('user_id').dropDuplicates().count()
或:
dfA.select("user_id").dropDuplicates(["user_id",]).join(broadcast(dfB.select("user_id")\
.dropDuplicates(["user_id",])), ['user_id'], how='inner').select('user_id').count()
或者带有 distinct 的版本也应该可以工作。
dfA.select("user_id").distinct().join(broadcast(dfB.select("user_id").distinct()),\
['user_id'], how='inner').select('user_id').count()