为什么 Spark 尝试发送 GetMapOutputStatuses 时报告 "Error communicating with MapOutputTracker"?
Why is "Error communicating with MapOutputTracker" reported when Spark tries to send GetMapOutputStatuses?
我正在使用 Spark 1.3 对大量数据进行聚合。该作业包含 4 个步骤:
- 读取一个大的(1TB)序列文件(对应1天的数据)
- 过滤掉大部分并获得大约 1GB 的随机写入
- keyBy 客户
- aggregateByKey() 到为该客户构建配置文件的自定义结构,对应于每个客户的 HashMap[Long, Float]。长键是唯一的,绝不会超过 50K 个不同的条目。
我运行使用以下配置进行配置:
--name geo-extract--askTimeout \
--executor-cores 8 \
--num-executors 100 \
--executor-memory 40g \
--driver-memory 4g \
--driver-cores 8 \
--conf 'spark.storage.memoryFraction=0.25' \
--conf 'spark.shuffle.memoryFraction=0.35' \
--conf 'spark.kryoserializer.buffer.max.mb=1024' \
--conf 'spark.akka.frameSize=1024' \
--conf 'spark.akka.timeout=200' \
--conf 'spark.akka.askTimeout=111' \
--master yarn-cluster \
出现此错误:
org.apache.spark.SparkException: Error communicating with MapOutputTracker
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:117)
at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:164)
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
...
Caused by: org.apache.spark.SparkException: Error sending message [message = GetMapOutputStatuses(0)]
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113)
... 21 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
该作业和逻辑已被证明适用于小型测试集,我什至可以 运行 该作业适用于某些日期,但不适用于其他日期。我四处搜索,发现 "Error communicating with MapOutputTracker" 与内部 Spark 消息相关的提示,但我已经增加了 "spark.akka.frameSize"、"spark.akka.timeout" 和 "spark.akka.askTimeout"(最后一个甚至没有出现在 Spark 文档上,但在 Spark 邮件列表中提到过),但无济于事。在 30 秒时仍有一些超时,我不知道如何识别或修复。
我认为没有理由因为数据大小而失败,因为过滤操作和 aggregateByKey 执行本地部分聚合的事实应该足以解决数据大小问题。任务数量为 16K(从原始输入自动获取),远远超过 100 个执行器上 运行 的 800 个核心,因此它不像通常的 "increment partitions" 提示那么简单。任何线索将不胜感激!谢谢!
发生此故障时驱动程序中发生了什么?这可能是由于驱动程序的内存压力导致它没有响应。如果我没记错的话,它在调用 GetMapOutputStatuses 时试图到达的 MapOutputTracker 在 Spark 驱动程序进程中是 运行。
如果您在该过程中由于某种原因面临长时间的 GC 或其他暂停,这将导致您在上面看到的异常。
有些事情可以尝试,当您开始看到这些错误并查看会发生什么时,尝试对驱动程序进程进行 jstacking。如果 jstack 没有响应,可能是您的驱动程序没有足够的响应。
16K 的任务听起来确实让驱动程序需要跟踪很多,您是否有机会将驱动程序内存增加到 4g 以上?
我有一个类似的问题,我的工作在较小的数据集上工作正常,但在较大的数据集上会失败。
经过大量配置更改后,我发现更改驱动程序内存设置比更改执行程序内存设置影响更大。
使用新的垃圾收集器也有很大帮助。我将以下配置用于 3 个集群,每个集群有 40 个核心。希望以下配置对您有所帮助:
spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:NewRatio=3 -
XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:MaxPermSize=4g
-XX:PermSize=1G -XX:+PrintGCTimeStamps -XX:+UnlockDiagnosticVMOptions
spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:NewRatio=3 -
XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:MaxPermSize=4g
-XX:PermSize=1G -XX:+PrintGCTimeStamps -XX:+UnlockDiagnosticVMOptions
spark.driver.memory=8g
spark.driver.cores=10
spark.driver.maxResultSize=8g
spark.executor.memory=16g
spark.executor.cores=25
spark.default.parallelism=50
spark.eventLog.dir=hdfs://mars02-db01/opt/spark/logs
spark.eventLog.enabled=true
spark.kryoserializer.buffer=512m
spark.kryoserializer.buffer.max=1536m
spark.rdd.compress=true
spark.storage.memoryFraction=0.15
spark.storage.MemoryStore=12g
试试下面的方法属性
spark.shuffle.reduceLocality.enabled = false.
我正在使用 Spark 1.3 对大量数据进行聚合。该作业包含 4 个步骤:
- 读取一个大的(1TB)序列文件(对应1天的数据)
- 过滤掉大部分并获得大约 1GB 的随机写入
- keyBy 客户
- aggregateByKey() 到为该客户构建配置文件的自定义结构,对应于每个客户的 HashMap[Long, Float]。长键是唯一的,绝不会超过 50K 个不同的条目。
我运行使用以下配置进行配置:
--name geo-extract--askTimeout \
--executor-cores 8 \
--num-executors 100 \
--executor-memory 40g \
--driver-memory 4g \
--driver-cores 8 \
--conf 'spark.storage.memoryFraction=0.25' \
--conf 'spark.shuffle.memoryFraction=0.35' \
--conf 'spark.kryoserializer.buffer.max.mb=1024' \
--conf 'spark.akka.frameSize=1024' \
--conf 'spark.akka.timeout=200' \
--conf 'spark.akka.askTimeout=111' \
--master yarn-cluster \
出现此错误:
org.apache.spark.SparkException: Error communicating with MapOutputTracker
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:117)
at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:164)
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
...
Caused by: org.apache.spark.SparkException: Error sending message [message = GetMapOutputStatuses(0)]
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113)
... 21 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
该作业和逻辑已被证明适用于小型测试集,我什至可以 运行 该作业适用于某些日期,但不适用于其他日期。我四处搜索,发现 "Error communicating with MapOutputTracker" 与内部 Spark 消息相关的提示,但我已经增加了 "spark.akka.frameSize"、"spark.akka.timeout" 和 "spark.akka.askTimeout"(最后一个甚至没有出现在 Spark 文档上,但在 Spark 邮件列表中提到过),但无济于事。在 30 秒时仍有一些超时,我不知道如何识别或修复。
我认为没有理由因为数据大小而失败,因为过滤操作和 aggregateByKey 执行本地部分聚合的事实应该足以解决数据大小问题。任务数量为 16K(从原始输入自动获取),远远超过 100 个执行器上 运行 的 800 个核心,因此它不像通常的 "increment partitions" 提示那么简单。任何线索将不胜感激!谢谢!
发生此故障时驱动程序中发生了什么?这可能是由于驱动程序的内存压力导致它没有响应。如果我没记错的话,它在调用 GetMapOutputStatuses 时试图到达的 MapOutputTracker 在 Spark 驱动程序进程中是 运行。
如果您在该过程中由于某种原因面临长时间的 GC 或其他暂停,这将导致您在上面看到的异常。
有些事情可以尝试,当您开始看到这些错误并查看会发生什么时,尝试对驱动程序进程进行 jstacking。如果 jstack 没有响应,可能是您的驱动程序没有足够的响应。
16K 的任务听起来确实让驱动程序需要跟踪很多,您是否有机会将驱动程序内存增加到 4g 以上?
我有一个类似的问题,我的工作在较小的数据集上工作正常,但在较大的数据集上会失败。
经过大量配置更改后,我发现更改驱动程序内存设置比更改执行程序内存设置影响更大。 使用新的垃圾收集器也有很大帮助。我将以下配置用于 3 个集群,每个集群有 40 个核心。希望以下配置对您有所帮助:
spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:NewRatio=3 -
XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:MaxPermSize=4g
-XX:PermSize=1G -XX:+PrintGCTimeStamps -XX:+UnlockDiagnosticVMOptions
spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:NewRatio=3 -
XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:MaxPermSize=4g
-XX:PermSize=1G -XX:+PrintGCTimeStamps -XX:+UnlockDiagnosticVMOptions
spark.driver.memory=8g
spark.driver.cores=10
spark.driver.maxResultSize=8g
spark.executor.memory=16g
spark.executor.cores=25
spark.default.parallelism=50
spark.eventLog.dir=hdfs://mars02-db01/opt/spark/logs
spark.eventLog.enabled=true
spark.kryoserializer.buffer=512m
spark.kryoserializer.buffer.max=1536m
spark.rdd.compress=true
spark.storage.memoryFraction=0.15
spark.storage.MemoryStore=12g
试试下面的方法属性
spark.shuffle.reduceLocality.enabled = false.