什么会影响在 spark 中混洗的数据量
What affects amount of data shuffled in spark
例如我在 spark 上执行一些查询,在 spark UI 中我可以看到一些查询有更多的 shuffle ,这个 shuffle 似乎是本地读取和执行者之间读取的数据量。
但是我不明白一件事,例如下面的查询从 HDFS 加载了 7GB,但是 suffle read + shuffled write 超过 10GB。但我看到其他查询也从 HDFS 加载了 7GB,而随机播放大约是 500kb。所以我不明白这一点,你能帮忙吗? shuffle 的数据量与从 hdfs 读取的数据无关?
select
nation, o_year, sum(amount) as sum_profit
from
(
select
n_name as nation, year(o_orderdate) as o_year,
l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount
from
orders o join
(select l_extendedprice, l_discount, l_quantity, l_orderkey, n_name, ps_supplycost
from part p join
(select l_extendedprice, l_discount, l_quantity, l_partkey, l_orderkey,
n_name, ps_supplycost
from partsupp ps join
(select l_suppkey, l_extendedprice, l_discount, l_quantity, l_partkey,
l_orderkey, n_name
from
(select s_suppkey, n_name
from nation n join supplier s on n.n_nationkey = s.s_nationkey
) s1 join lineitem l on s1.s_suppkey = l.l_suppkey
) l1 on ps.ps_suppkey = l1.l_suppkey and ps.ps_partkey = l1.l_partkey
) l2 on p.p_name like '%green%' and p.p_partkey = l2.l_partkey
) l3 on o.o_orderkey = l3.l_orderkey
)profit
group by nation, o_year
order by nation, o_year desc;
我强烈推荐阅读我认为 the paper on explaining the Mapreduce programming model 的内容。
基本上,不是 HDFS(或任何来源)上的数据量决定了有多少数据被洗牌。我将尝试使用三个示例进行解释:
示例 1. 打乱的数据量小于输入数据:
val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
这里我们计算每个分区中的单词数(对于每个键),然后只对结果进行混洗。然后,一旦我们打乱了子计数,我们就将它们加起来。所以我们洗牌的数据量与计数量有关。所以在这种情况下,它与唯一单词的数量有关。
如果我们只有一个独特的词,我们洗牌的数据会比输入少很多。事实上,线程的数量与线程的数量一样多(所以数量很少)。
假设如果每个词都是唯一的,那么我们将洗牌更多的数据(阅读论文了解详情)。所以在这个例子中洗牌的数据量与我们有多少个唯一键(唯一词)有关。
示例 2。混洗的数据量与输入数据相同:
val wordCounts = words.map((_, 1)).groupByKey().mapValues(_.size)
在这里,我们将所有单词组合在一起,然后计算有多少个。所以我们需要随机播放所有数据。
示例 3。打乱的数据量多于输入数据:
val silly =
words.map(word =>
(word, generateReallyLongString()))
.groupByKey()
在这里,我们的映射阶段将每个单词映射到一个非常长的随机字符串,然后我们将它们按单词组合在一起。这里我们生成的数据多于输入,并且将洗牌多于输入的数据。
shuffle 是 Spark 重新分配数据的机制,因此它可以跨分区进行不同的分组。这通常涉及跨执行程序和机器复制数据。所以这里很清楚,混洗数据并不真正依赖于输入数据的数量。但是,这取决于您对输入数据执行的操作,这会导致数据在执行程序(以及机器)之间移动。请通过 http://spark.apache.org/docs/latest/programming-guide.html#shuffle-operations 了解并理解为什么改组是一个代价高昂的过程。
查看您粘贴的查询,似乎您正在执行大量连接操作(没有深入了解您正在执行的最终操作)。这肯定需要跨分区移动数据。可以通过重新访问查询并优化相同查询或以导致数据移动较少的方式操作或预处理输入数据来处理该问题(例如:将已连接的数据放在同一分区中).同样,这只是一个示例,您必须根据您的用例确定最适合您的方法。
例如我在 spark 上执行一些查询,在 spark UI 中我可以看到一些查询有更多的 shuffle ,这个 shuffle 似乎是本地读取和执行者之间读取的数据量。
但是我不明白一件事,例如下面的查询从 HDFS 加载了 7GB,但是 suffle read + shuffled write 超过 10GB。但我看到其他查询也从 HDFS 加载了 7GB,而随机播放大约是 500kb。所以我不明白这一点,你能帮忙吗? shuffle 的数据量与从 hdfs 读取的数据无关?
select
nation, o_year, sum(amount) as sum_profit
from
(
select
n_name as nation, year(o_orderdate) as o_year,
l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount
from
orders o join
(select l_extendedprice, l_discount, l_quantity, l_orderkey, n_name, ps_supplycost
from part p join
(select l_extendedprice, l_discount, l_quantity, l_partkey, l_orderkey,
n_name, ps_supplycost
from partsupp ps join
(select l_suppkey, l_extendedprice, l_discount, l_quantity, l_partkey,
l_orderkey, n_name
from
(select s_suppkey, n_name
from nation n join supplier s on n.n_nationkey = s.s_nationkey
) s1 join lineitem l on s1.s_suppkey = l.l_suppkey
) l1 on ps.ps_suppkey = l1.l_suppkey and ps.ps_partkey = l1.l_partkey
) l2 on p.p_name like '%green%' and p.p_partkey = l2.l_partkey
) l3 on o.o_orderkey = l3.l_orderkey
)profit
group by nation, o_year
order by nation, o_year desc;
我强烈推荐阅读我认为 the paper on explaining the Mapreduce programming model 的内容。
基本上,不是 HDFS(或任何来源)上的数据量决定了有多少数据被洗牌。我将尝试使用三个示例进行解释:
示例 1. 打乱的数据量小于输入数据:
val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
这里我们计算每个分区中的单词数(对于每个键),然后只对结果进行混洗。然后,一旦我们打乱了子计数,我们就将它们加起来。所以我们洗牌的数据量与计数量有关。所以在这种情况下,它与唯一单词的数量有关。
如果我们只有一个独特的词,我们洗牌的数据会比输入少很多。事实上,线程的数量与线程的数量一样多(所以数量很少)。
假设如果每个词都是唯一的,那么我们将洗牌更多的数据(阅读论文了解详情)。所以在这个例子中洗牌的数据量与我们有多少个唯一键(唯一词)有关。
示例 2。混洗的数据量与输入数据相同:
val wordCounts = words.map((_, 1)).groupByKey().mapValues(_.size)
在这里,我们将所有单词组合在一起,然后计算有多少个。所以我们需要随机播放所有数据。
示例 3。打乱的数据量多于输入数据:
val silly =
words.map(word =>
(word, generateReallyLongString()))
.groupByKey()
在这里,我们的映射阶段将每个单词映射到一个非常长的随机字符串,然后我们将它们按单词组合在一起。这里我们生成的数据多于输入,并且将洗牌多于输入的数据。
shuffle 是 Spark 重新分配数据的机制,因此它可以跨分区进行不同的分组。这通常涉及跨执行程序和机器复制数据。所以这里很清楚,混洗数据并不真正依赖于输入数据的数量。但是,这取决于您对输入数据执行的操作,这会导致数据在执行程序(以及机器)之间移动。请通过 http://spark.apache.org/docs/latest/programming-guide.html#shuffle-operations 了解并理解为什么改组是一个代价高昂的过程。
查看您粘贴的查询,似乎您正在执行大量连接操作(没有深入了解您正在执行的最终操作)。这肯定需要跨分区移动数据。可以通过重新访问查询并优化相同查询或以导致数据移动较少的方式操作或预处理输入数据来处理该问题(例如:将已连接的数据放在同一分区中).同样,这只是一个示例,您必须根据您的用例确定最适合您的方法。