根据工作人员、核心和 DataFrame 大小确定 Spark 分区的最佳数量

Determining optimal number of Spark partitions based on workers, cores and DataFrame size

Spark-land 中有几个相似但又不同的概念,围绕着如何将工作分包到不同的节点并同时执行。具体有:

相信所有的Spark集群都有一个而且只有一个 Spark Driver,然后是0+个工作节点。如果我错了,请先纠正我!假设我或多或少是正确的,让我们在这里锁定几个变量。假设我们有一个包含 1 个驱动程序和 4 个工作节点的 Spark 集群,每个工作节点上有 4 CPU 个核心(因此总共有 16 个 CPU 个核心)。所以这里的"given"是:

sparkDriverCount = 1
numWorkerNodes = 4
numCpuCores = numWorkerNodes * numCpuCoresPerWorker = 4 * 4 = 16

作为设置,我想知道如何确定一些事情。具体来说:

是的,一个 application has

What is the relationship between numWorkerNodes and numExecutors?

一个 worker 可以托管多个 executor,你可以把它想象成 worker 是集群的 machine/node,而 executor 是在那个 worker 上运行的进程(在核心中执行)。

所以`numWorkerNodes <= numExecutors'。

Is there any ration for them?

就个人而言,在一个假集群中工作,我的笔记本电脑是驱动程序,同一台笔记本电脑中的虚拟机是工作人员, 在 > 的工业集群中10k 个节点,我不需要关心这个,因为 似乎已经处理好了。

我只用:

--num-executors 64

当我 launch/submit 我的脚本和 知道,我猜,它需要召唤多少工人(当然,还要考虑其他参数,以及机)。

因此,就我个人而言,我不知道任何这样的比率。


Is there a known/generally-accepted/optimal ratio of numDFRows to numPartitions?

我不知道一个,但根据经验,您可以依赖#executors 乘以#executor.cores 的乘积,然后将其乘以 3 或 4。当然这是 启发式。在 中,它看起来像这样:

sc = SparkContext(appName = "smeeb-App")
total_cores = int(sc._conf.get('spark.executor.instances')) * int(sc._conf.get('spark.executor.cores'))
dataset = sc.textFile(input_path, total_cores * 3)

How does one calculate the 'optimal' number of partitions based on the size of the DataFrame?

这是一个很好的问题。当然这很难回答,这取决于你的数据、集群等,但正如我自己所讨论的 here

分区太少,您将拥有大量数据,尤其是在处理 时,从而使您的应用程序处于内存压力之下。

分区太多,您的 taking much pressure, since all the metadata that has to be generated from the 会随着分区数量的增加而显着增加(因为它维护临时文件等)。 *

所以你想要的是找到分区数量的最佳点,这是微调你的应用程序[=69]的一部分=]. :)

'rule of thumb' is: numPartitions = numWorkerNodes * numCpuCoresPerWorker, is it true?

啊,看到这个之前我正在写上面的启发式。所以这已经回答了,但要考虑 workerexecutor.

的区别

* 我今天刚刚失败了:, when using too many partitions caused .