在 Spark 中聚合大型数据集 SQL
Aggregating large Datasets in Spark SQL
考虑以下代码:
case class Person(
personId: Long, name: String, ageGroup: String, gender: String,
relationshipStatus: String, country: String, state: String
)
case class PerPersonPower(personId: Long, power: Double)
val people: Dataset[Person] = ... // Around 50 million entries.
val powers: Dataset[PerPersonPower] = ... // Around 50 million entries.
people.join(powers, "personId")
.groupBy("ageGroup", "gender", "relationshipStatus", "country", "state")
.agg(
sum("power").alias("totalPower"),
count("*").alias("personCount")
)
它在具有大约 100 GB RAM 的集群上执行。但是,集群内存不足。我不知道该怎么办。实际上,people
被 $"personId"
分区并缓存 - people.repartition($"personId").cache()
.
关于如何优化此计算的任何想法?
该集群是普通的 Google Dataproc 集群 --- 因此它在客户端模式下使用 YARN --- 由 14 个节点组成,每个节点具有 8 GB RAM。
根据请求中可用的有限信息,我建议不要使用缓存并创建比默认数量多一点的分区(通常为 200,但可能因集群而异)- 尝试设置 spark.shuffle.partitions
在您的应用程序中从 1000 或 2000 开始。它可以像 spark.conf.set('spark.shuffle.partitions', 1000)
那样完成。很可能您的查询命中 SortMergeJoin 并且当前执行程序获取更多数据,它是堆减去 YARN 开销。请咨询您的 以监控和优化您的查询执行。在 SQL 选项卡中,您会看到有关每个阶段正在处理的数据量的非常详细的数字,因此您会发现瓶颈并更快地修复它们。
Spark 查询规划器将首先按 spark.shuffle.partitions
中定义的 personId 对 PerPersonPower
和 Person
进行排序,将其刷新到 HDFS 到 spark.shuffle.partitions
单独的镶木地板文件中,然后创建相同数量的部分聚合并将它们放入生成的数据框中。
您似乎加入了大约 18-20GB(人)的数据和大约 800MB(功率)。如果功率会小一点,您可以尝试使用 BroadcastHashJoin 就像 people.join(broadcast(powers), "personId")
,但我不建议广播大于 128Mb 或 256Mb 的数据帧。
祝你好运!
考虑以下代码:
case class Person(
personId: Long, name: String, ageGroup: String, gender: String,
relationshipStatus: String, country: String, state: String
)
case class PerPersonPower(personId: Long, power: Double)
val people: Dataset[Person] = ... // Around 50 million entries.
val powers: Dataset[PerPersonPower] = ... // Around 50 million entries.
people.join(powers, "personId")
.groupBy("ageGroup", "gender", "relationshipStatus", "country", "state")
.agg(
sum("power").alias("totalPower"),
count("*").alias("personCount")
)
它在具有大约 100 GB RAM 的集群上执行。但是,集群内存不足。我不知道该怎么办。实际上,people
被 $"personId"
分区并缓存 - people.repartition($"personId").cache()
.
关于如何优化此计算的任何想法?
该集群是普通的 Google Dataproc 集群 --- 因此它在客户端模式下使用 YARN --- 由 14 个节点组成,每个节点具有 8 GB RAM。
根据请求中可用的有限信息,我建议不要使用缓存并创建比默认数量多一点的分区(通常为 200,但可能因集群而异)- 尝试设置 spark.shuffle.partitions
在您的应用程序中从 1000 或 2000 开始。它可以像 spark.conf.set('spark.shuffle.partitions', 1000)
那样完成。很可能您的查询命中 SortMergeJoin 并且当前执行程序获取更多数据,它是堆减去 YARN 开销。请咨询您的
Spark 查询规划器将首先按 spark.shuffle.partitions
中定义的 personId 对 PerPersonPower
和 Person
进行排序,将其刷新到 HDFS 到 spark.shuffle.partitions
单独的镶木地板文件中,然后创建相同数量的部分聚合并将它们放入生成的数据框中。
您似乎加入了大约 18-20GB(人)的数据和大约 800MB(功率)。如果功率会小一点,您可以尝试使用 BroadcastHashJoin 就像 people.join(broadcast(powers), "personId")
,但我不建议广播大于 128Mb 或 256Mb 的数据帧。
祝你好运!