getExecutorMemoryStatus().size() 没有输出正确的执行者数量

getExecutorMemoryStatus().size() not outputting correct num of executors

简而言之,我需要Spark集群中executors/workers的数量,但是使用sc._jsc.sc().getExecutorMemoryStatus().size()给我1,而实际上有12个执行者.

更多详细信息,我正在尝试确定执行程序的数量并将该数量用作我要求 Spark 分布我的 RDD 的分区数量。我这样做是为了利用并行性,因为我的初始数据只是一个 运行ge 数字,但随后每个数据都在 rdd#foreach 方法中进行处理。这个过程既需要内存又需要大量计算,所以我希望 运行ge 最初驻留在与执行程序一样多的分区中,以允许所有执行程序同时处理它的块。

阅读 and seeing the documentation 中对 scala 的 getExecutorMemoryStatus 的评论,建议的命令:sc._jsc.sc().getExecutorMemoryStatus().size() 似乎是合理的。但出于某种原因,无论实际存在多少执行者,我都会得到答案 1(在我的最后一个 运行 - 它是 12)。

我是不是做错了什么?我调用了错误的方法吗?走错路了?

我运行正在独立的 Spark 集群上运行,每次都会为应用程序的 运行 启动。

这是问题的一个最小示例:

from pyspark import SparkConf, SparkContext
import datetime


def print_debug(msg):
    dbg_identifier = 'dbg_et '
    print(dbg_identifier + str(datetime.datetime.now()) + ':  ' + msg)


print_debug('*****************before configuring sparkContext')
conf = SparkConf().setAppName("reproducing_bug_not_all_executors_working")
sc = SparkContext(conf=conf)
print_debug('*****************after configuring sparkContext')


def main():
    executors_num = sc._jsc.sc().getExecutorMemoryStatus().size()
    list_rdd = sc.parallelize([1, 2, 3, 4, 5], executors_num)
    print_debug('line before loop_a_lot. Number of partitions created={0}, 
        while number of executors is {1}'
          .format(list_rdd.getNumPartitions(), executors_num))
    list_rdd.foreach(loop_a_lot)
    print_debug('line after loop_a_lot')


def loop_a_lot(x):
    y = x
    print_debug('started working on item %d at ' % x + str(datetime.datetime.now()))
    for i in range(100000000):
        y = y*y/6+5
    print_debug('--------------------finished working on item %d at ' % x + str(datetime.datetime.now())
      + 'with a result: %.3f' % y)

if __name__ == "__main__":
    main()

并显示问题 - 上次我 运行 我在驱动程序的输出中得到了它(仅粘贴相关部分,占位符而不是真实的 ips 和端口):

$> grep -E 'dbg_et|Worker:54 - Starting Spark worker' slurm-<job-num>.out
2018-07-14 20:48:26 INFO  Worker:54 - Starting Spark worker <ip1>:<port1> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:26 INFO  Worker:54 - Starting Spark worker <ip1>:<port2> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip2>:<port3> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip2>:<port4> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip3>:<port5> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip3>:<port6> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip4>:<port7> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip4>:<port8> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip5>:<port9> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip5>:<port10> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip6>:<port11> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip6>:<port12> with 10 cores, 124.9 GB RAM
dbg_et 2018-07-14 20:48:37.044785:  *****************before configuring sparkContext
dbg_et 2018-07-14 20:48:38.708370:  *****************after configuring sparkContext
dbg_et 2018-07-14 20:48:39.046295:  line before loop_a_lot. Number of partitions created=1, while number of executors is 1
dbg_et 2018-07-14 20:50:11.181091:  line after loop_a_lot

并且在 worker_dir 中 Spark 为 运行 创建了一个新目录,它有 12 个子目录 - 只有其中一个(这次是目录 5)有一个副本脚本和一个非空输出,这是有意义的,因为执行程序的误读数量 1 使得 Spark 仅在一个分区中创建 rdd。这是那个工作人员的完整输出(这个输出实际上是标准错误 - 我不知道为什么它不在标准输出中,因为它应该是):

dbg_et 2018-07-14 20:48:41.806805:  started working on item 1 at 2018-07-14 20:48:41.806733
dbg_et 2018-07-14 20:48:59.710258:  --------------------finished working on item 1 at 2018-07-14 20:48:59.710198
with a result: inf
dbg_et 2018-07-14 20:48:59.710330:  started working on item 2 at 2018-07-14 20:48:59.710315
dbg_et 2018-07-14 20:49:17.367545:  --------------------finished working on item 2 at 2018-07-14 20:49:17.367483
with a result: inf
dbg_et 2018-07-14 20:49:17.367613:  started working on item 3 at 2018-07-14 20:49:17.367592
dbg_et 2018-07-14 20:49:35.382441:  --------------------finished working on item 3 at 2018-07-14 20:49:35.381597
with a result: inf
dbg_et 2018-07-14 20:49:35.382517:  started working on item 4 at 2018-07-14 20:49:35.382501
dbg_et 2018-07-14 20:49:53.227696:  --------------------finished working on item 4 at 2018-07-14 20:49:53.227615
with a result: inf
dbg_et 2018-07-14 20:49:53.227771:  started working on item 5 at 2018-07-14 20:49:53.227755
dbg_et 2018-07-14 20:50:11.128510:  --------------------finished working on item 5 at 2018-07-14 20:50:11.128452
with a result: inf

有人可以帮助我了解导致问题的原因吗?任何的想法?可能是因为 Slurm? (正如您所见,我 grep 编辑了驱动程序的输出文件 - 我在 Slurm 之上 运行ning Spark,因为我有权访问的集群由它管理)

简短修复: 在使用 defaultParallelism_jsc.sc().getExecutorMemoryStatus() 之前留出时间(例如添加 sleep 命令),如果您使用应用程序执行的开始。

解释: 当只有一个执行者时,启动时似乎有很短的时间(我认为单个执行者是驱动程序,在某些情况下被视为执行者)。这就是为什么在主函数的顶部使用 sc._jsc.sc().getExecutorMemoryStatus() 对我来说产生了错误的数字。 defaultParallelism(1).

也发生了同样的情况

我怀疑驱动程序在让所有工作人员连接到它之前就开始使用自己作为工作人员工作。它同意使用 --total-executor-cores 12

将以下代码提交给 spark-submit 这一事实
import time

conf = SparkConf().setAppName("app_name")
sc = SparkContext(conf=conf)
log4jLogger = sc._jvm.org.apache.log4j
log = log4jLogger.LogManager.getLogger("dbg_et")

log.warn('defaultParallelism={0}, and size of executorMemoryStatus={1}'.format(sc.defaultParallelism,
           sc._jsc.sc().getExecutorMemoryStatus().size()))
time.sleep(15)
log.warn('After 15 seconds: defaultParallelism={0}, and size of executorMemoryStatus={1}'
          .format(sc.defaultParallelism, 
                  sc._jsc.sc().getExecutorMemoryStatus().size()))
rdd_collected = (sc.parallelize([1, 2, 3, 4, 5] * 200, 
spark_context_holder.getParallelismAlternative()*3)
             .map(lambda x: (x, x*x) * 2)
             .map(lambda x: x[2] + x[1])
             )
log.warn('Made rdd with {0} partitioned. About to collect.'
          .format(rdd_collected.getNumPartitions()))
rdd_collected.collect()
log.warn('And after rdd operations: defaultParallelism={0}, and size of executorMemoryStatus={1}'
          .format(sc.defaultParallelism,
                  sc._jsc.sc().getExecutorMemoryStatus().size()))

给我以下输出

> tail -n 4 slurm-<job number>.out
18/09/26 13:23:52 WARN dbg_et: defaultParallelism=2, and size of executorMemoryStatus=1
18/09/26 13:24:07 WARN dbg_et: After 15 seconds: defaultParallelism=12, and size of executorMemoryStatus=13
18/09/26 13:24:07 WARN dbg_et: Made rdd with 36 partitioned. About to collect.
18/09/26 13:24:11 WARN dbg_et: And after rdd operations: defaultParallelism=12, and size of executorMemoryStatus=13

并且检查创建工作目录的时间,我看到它就在 defaultParallelismgetExecutorMemoryStatus().size() 的正确值被记录之后 (2)。重要的是,在记录这两个参数的错误值之后,这个时间是相当长的时间(~10 秒)(参见上面带有“defaultParallelism=2”的行的时间与这些目录的时间'下面的创作)

 > ls -l --time-style=full-iso spark/worker_dir/app-20180926132351-0000/
 <permission user blah> 2018-09-26 13:24:08.909960000 +0300 0/
 <permission user blah> 2018-09-26 13:24:08.665098000 +0300 1/
 <permission user blah> 2018-09-26 13:24:08.912871000 +0300 10/
 <permission user blah> 2018-09-26 13:24:08.769355000 +0300 11/
 <permission user blah> 2018-09-26 13:24:08.931957000 +0300 2/
 <permission user blah> 2018-09-26 13:24:09.019684000 +0300 3/
 <permission user blah> 2018-09-26 13:24:09.138645000 +0300 4/
 <permission user blah> 2018-09-26 13:24:08.757164000 +0300 5/
 <permission user blah> 2018-09-26 13:24:08.996918000 +0300 6/
 <permission user blah> 2018-09-26 13:24:08.640369000 +0300 7/
 <permission user blah> 2018-09-26 13:24:08.846769000 +0300 8/
 <permission user blah> 2018-09-26 13:24:09.152162000 +0300 9/

(1) 在开始使用 getExecutorMemoryStatus() 之前,我尝试使用 defaultParallelism,正如你应该的那样,但它一直给我数字 2。现在我明白这是出于同样的原因。 运行 在独立集群上,如果驱动程序只看到 1 个执行程序,则 defaultParallelism = 2 可以在 documentation for spark.default.parallelism.

中看到

(2) 我不确定为什么在创建目录之前这些值是正确的 - 但我假设执行者的启动顺序让他们在创建目录之前连接到驱动程序。