Spark Dataframes:加入后倾斜分区

Spark Dataframes: Skewed Partition after Join

我有两个数据框,df1 有 2200 万条记录,df2 有 200 万条记录。我在 email_address 上做正确的连接作为键。

test_join = df2.join(df1, "email_address", how = 'right').cache()

两个数据框中的重复邮件(如果有的话)很少。加入后,我试图找到结果数据帧的分区大小 test_join,使用此代码:

l = builder.rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
print(max(l,key=lambda item:item[1]),min(l,key=lambda item:item[1]))

结果显示最大的分区大约是平均分区大小的 100 倍。这种分区大小的偏差导致 post-join 转换和操作出现性能问题。

我知道我可以在加入后使用 repartion(num_partitions) 命令对它进行同样的重新分区,但我的问题是为什么我会遇到这种不均匀的分区结果,有没有办法首先避免它.

P.S:只是为了检查问题是否仅与 email_address 哈希函数有关,我还检查了其他几个连接的分区大小,我还在数字键中看到了问题也加入。

@user6910411 你说对了。问题出在我的数据上,遵循一些愚蠢的约定来输入空电子邮件,这导致了这个倾斜键问题。

在检查了最大分区中的条目后,我才知道那里发生了什么。我发现这种调试技术非常有用,我相信这可以帮助面临同样问题的其他人。

顺便说一句,这是我写的函数,用于查找 RDD 分区的偏度:

from itertools import islice
def check_skewness(df):
    sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample, to make processing fast
    l = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
    max_part = max(l,key=lambda item:item[1])
    min_part = min(l,key=lambda item:item[1])
    if max_part[1]/min_part[1] > 5: #if difference between largest and smallest partition size is greater than 5 times
        print 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'\nSample Content of the largest Partition: \n'
        print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5) if i == max_part[0] else []).take(5))
    else:
        print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part

然后我只传递要检查偏度的数据框,如下所示:

check_skewness(test_join)

它给了我关于它的偏度的很好的信息。