为什么每个 Spark 任务都没有利用所有分配的核心?

Why is each Spark Task not utilizing all allocated cores?

假设每个执行器有 36 个内核,每个节点有一个执行器,3 个节点每个有 48 个可用内核。我注意到的基本要点是,当我将每个任务设置为使用 1 个核心(默认值)时,我对工作人员的 CPU 利用率约为 70%,并且每个执行者将同时执行 36 个任务(因为我会预料到的)。但是,当我将配置更改为每个任务有 6 个内核 (--conf spark.task.cpus=6) 时,每个执行程序一次减少到 6 个任务(如预期的那样),但我的 CPU 利用率也下降到 10 以下利用率百分比(意外)。我本以为 Spark 知道如何在 6 个内核上并行化工作负载。

重要的实现细节是我是 运行 DataFrame 列上的 UDF 函数,并将结果作为新列附加到该数据框上。此 UDF 函数使用 @transient 对象,该对象提供我正在使用的机器学习算法。此 UDF 函数不是聚合或合并操作的一部分,它只是对列的 map 操作,如下所示:

def myUdf = udf { ... }

val resultSet = myUdf(dataFrame.col("originalCol"))
val dataFrameWithResults = dataFrame.withColumn("originalColMetric", resultSet)

我原以为 Spark 会执行 6 myUdf 一次处理 6 条记录,每个核心一个,但事实并非如此。有没有办法解决这个问题(无需向 Spark 项目提交 PR),或者至少有人可以解释为什么会发生这种情况?

预料到这个问题,我正在尝试增加每个任务的内核数量,以减少每个执行程序所需的 RAM 数量。在这种情况下,一次执行太多任务会以指数方式增加 RAM 使用率。

spark.task.cpus 为每个任务分配的内核数 。它用于为单个任务分配多个内核,以防用户代码是多线程的。如果您的 udf 不使用多个(不在单个函数调用中生成多个线程)线程,那么核心就被浪费了。

to process 6 records at a time

分配 6 个核心,spark.task.cpus 设置为 1。如果要限制节点上的任务数,则减少每个节点提供的核心数。

本质上,Spark 可以自行确定如何通过将记录拆分到每个任务(根据分区)并确定每个执行器可以同时处理多少个任务,从而同时将 UDF 映射到多个记录上。但是,Spark 不能自动拆分每个核心每个任务的工作。要在每个任务中使用多个内核,需要编写 UDF 中的代码(每个任务一次(按顺序)对一个记录执行),以便在该 UDF 中对单个记录进行并行计算。