PySpark - 一种更有效的计算公共元素的方法

PySpark - A more efficient method to count common elements

我有两个数据帧,比如 dfAdfB
我想取他们的交叉点,然后计算该交叉点中唯一 user_ids 的数量。

我试过以下非常慢而且经常崩溃的方法:

dfA.join(broadcast(dfB), ['user_id'], how='inner').select('user_id').dropDuplicates().count()

我需要 运行 很多这样的台词,才能得到一个情节。

如何高效地执行此类查询?

如问题中所述,数据框唯一相关的部分是 user_id 列(在您的问题中,您描述了您加入 user_id 之后仅使用 user_id 字段)

当您只需要每个数据帧中一列的不同值时,性能问题的根源在于连接两个大数据帧。

为了提高性能,我将执行以下操作:

  1. 创建两个仅包含每个数据帧的 user_id 列的小型 DF
    这将大大减少每个数据框的大小,因为它只包含一列(唯一相关的列)

    dfAuserid = dfA.select("user_id")
    dfBuserid = dfB.select("user_id")
    
  2. 得到distinct(注:相当于每个dataframe的dropDuplicate()
    这将大大减少每个数据帧的大小,因为每个新数据帧将仅包含 user_id.

    列的不同值
    dfAuseridDist = dfA.select("user_id").distinct()
    dfBuseridDist = dfB.select("user_id").distinct()
    
  3. 对上述两个极简数据帧执行join以获得交集的唯一值

我认为您可以先 select 必要的列,然后再执行连接。在连接之前移动 dropDuplicates 也应该是有益的,因为这样你就可以摆脱在其中一个数据帧中多次出现的 user_ids。

生成的查询可能如下所示:

dfA.select("user_id").join(broadcast(dfB.select("user_id")), ['user_id'], how='inner')\
    .select('user_id').dropDuplicates().count()

或:

dfA.select("user_id").dropDuplicates(["user_id",]).join(broadcast(dfB.select("user_id")\
    .dropDuplicates(["user_id",])), ['user_id'], how='inner').select('user_id').count()

或者带有 distinct 的版本也应该可以工作。

dfA.select("user_id").distinct().join(broadcast(dfB.select("user_id").distinct()),\
    ['user_id'], how='inner').select('user_id').count()