Spark Dataframes:加入后倾斜分区
Spark Dataframes: Skewed Partition after Join
我有两个数据框,df1
有 2200 万条记录,df2
有 200 万条记录。我在 email_address
上做正确的连接作为键。
test_join = df2.join(df1, "email_address", how = 'right').cache()
两个数据框中的重复邮件(如果有的话)很少。加入后,我试图找到结果数据帧的分区大小 test_join
,使用此代码:
l = builder.rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
print(max(l,key=lambda item:item[1]),min(l,key=lambda item:item[1]))
结果显示最大的分区大约是平均分区大小的 100 倍。这种分区大小的偏差导致 post-join 转换和操作出现性能问题。
我知道我可以在加入后使用 repartion(num_partitions)
命令对它进行同样的重新分区,但我的问题是为什么我会遇到这种不均匀的分区结果,有没有办法首先避免它.
P.S:只是为了检查问题是否仅与 email_address 哈希函数有关,我还检查了其他几个连接的分区大小,我还在数字键中看到了问题也加入。
@user6910411 你说对了。问题出在我的数据上,遵循一些愚蠢的约定来输入空电子邮件,这导致了这个倾斜键问题。
在检查了最大分区中的条目后,我才知道那里发生了什么。我发现这种调试技术非常有用,我相信这可以帮助面临同样问题的其他人。
顺便说一句,这是我写的函数,用于查找 RDD 分区的偏度:
from itertools import islice
def check_skewness(df):
sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample, to make processing fast
l = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
max_part = max(l,key=lambda item:item[1])
min_part = min(l,key=lambda item:item[1])
if max_part[1]/min_part[1] > 5: #if difference between largest and smallest partition size is greater than 5 times
print 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'\nSample Content of the largest Partition: \n'
print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5) if i == max_part[0] else []).take(5))
else:
print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part
然后我只传递要检查偏度的数据框,如下所示:
check_skewness(test_join)
它给了我关于它的偏度的很好的信息。
我有两个数据框,df1
有 2200 万条记录,df2
有 200 万条记录。我在 email_address
上做正确的连接作为键。
test_join = df2.join(df1, "email_address", how = 'right').cache()
两个数据框中的重复邮件(如果有的话)很少。加入后,我试图找到结果数据帧的分区大小 test_join
,使用此代码:
l = builder.rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
print(max(l,key=lambda item:item[1]),min(l,key=lambda item:item[1]))
结果显示最大的分区大约是平均分区大小的 100 倍。这种分区大小的偏差导致 post-join 转换和操作出现性能问题。
我知道我可以在加入后使用 repartion(num_partitions)
命令对它进行同样的重新分区,但我的问题是为什么我会遇到这种不均匀的分区结果,有没有办法首先避免它.
P.S:只是为了检查问题是否仅与 email_address 哈希函数有关,我还检查了其他几个连接的分区大小,我还在数字键中看到了问题也加入。
@user6910411 你说对了。问题出在我的数据上,遵循一些愚蠢的约定来输入空电子邮件,这导致了这个倾斜键问题。
在检查了最大分区中的条目后,我才知道那里发生了什么。我发现这种调试技术非常有用,我相信这可以帮助面临同样问题的其他人。
顺便说一句,这是我写的函数,用于查找 RDD 分区的偏度:
from itertools import islice
def check_skewness(df):
sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample, to make processing fast
l = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
max_part = max(l,key=lambda item:item[1])
min_part = min(l,key=lambda item:item[1])
if max_part[1]/min_part[1] > 5: #if difference between largest and smallest partition size is greater than 5 times
print 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'\nSample Content of the largest Partition: \n'
print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5) if i == max_part[0] else []).take(5))
else:
print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part
然后我只传递要检查偏度的数据框,如下所示:
check_skewness(test_join)
它给了我关于它的偏度的很好的信息。