接收 TimeoutException 的可能原因是什么:使用 Spark 时,期货在 [n 秒] 后超时
What are possible reasons for receiving TimeoutException: Futures timed out after [n seconds] when working with Spark
我正在开发 Spark SQL 程序,我收到以下异常:
16/11/07 15:58:25 ERROR yarn.ApplicationMaster: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds]
java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:107)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.Union$$anonfun$doExecute.apply(basicOperators.scala:144)
at org.apache.spark.sql.execution.Union$$anonfun$doExecute.apply(basicOperators.scala:144)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:245)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.Union.doExecute(basicOperators.scala:144)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.columnar.InMemoryRelation.buildBuffers(InMemoryColumnarTableScan.scala:129)
at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>(InMemoryColumnarTableScan.scala:118)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryColumnarTableScan.scala:41)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery.apply(CacheManager.scala:93)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:60)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:84)
at org.apache.spark.sql.DataFrame.persist(DataFrame.scala:1581)
at org.apache.spark.sql.DataFrame.cache(DataFrame.scala:1590)
at com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56)
at com.somecompany.ml.modeling.NewModel.generateArtifacts(FlowForNewModel.scala:32)
at com.somecompany.ml.modeling.Flow$class.run(Flow.scala:52)
at com.somecompany.ml.modeling.lowForNewModel.run(FlowForNewModel.scala:15)
at com.somecompany.ml.Main$$anonfun.apply(Main.scala:54)
at com.somecompany.ml.Main$$anonfun.apply(Main.scala:54)
at scala.Option.getOrElse(Option.scala:121)
at com.somecompany.ml.Main$.main(Main.scala:46)
at com.somecompany.ml.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:542)
16/11/07 15:58:25 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds])
我从堆栈跟踪中识别出的代码的最后一部分是 com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56)
,这让我进入了这一行:profilesDF.cache()
在缓存之前,我在 2 个数据帧之间执行联合。我已经看到关于在连接之前保留两个数据帧的答案我仍然需要缓存联合数据帧,因为我在我的几个转换中使用它
而且我想知道是什么导致抛出这个异常?
搜索它让我得到 link 处理 rpc 超时异常或一些安全问题,这不是我的问题
如果您对如何解决它也有任何想法,我将不胜感激,但即使只是了解问题也会帮助我解决它
提前致谢
Question : I was wondering what may cause this exception to be thrown?
答案:
spark.sql.broadcastTimeout
300 Timeout in seconds for the broadcast
wait time in broadcast joins
spark.network.timeout
120s Default timeout for all network interactions.. spark.network.timeout (spark.rpc.askTimeout)
, spark.sql.broadcastTimeout
,
spark.kryoserializer.buffer.max
(if you are using kryo
serialization), etc. are tuned with larger-than-default values in
order to handle complex queries. You can start with these values and
adjust accordingly to your SQL workloads.
以下选项(参见 spark.sql. 属性)也可用于调整查询执行的性能。这些选项可能会在未来的版本中被弃用,因为会自动执行更多优化。*
此外,为了您更好地理解,您可以查看 BroadCastHashJoin 其中执行方法是上述堆栈跟踪的触发点。
protected override def doExecute(): RDD[Row] = {
val broadcastRelation = Await.result(broadcastFuture, timeout)
streamedPlan.execute().mapPartitions { streamedIter =>
hashJoin(streamedIter, broadcastRelation.value)
}
}
很高兴知道 Ram 的建议在某些情况下有效。我想提一下,我偶然发现了这个异常几次(包括 here 中描述的异常)。
很多时候,这是由于某些执行程序几乎无声的 OOM。在 SparkUI 上检查失败的任务,此 table 的最后一列: 您可能会注意到 OOM 消息。
如果理解好spark的内部结构,广播的数据会通过驱动程序。所以驱动程序有一些线程机制来从执行程序收集数据,并将其发送回所有执行程序。如果执行程序在某个时候失败,您可能会遇到这些超时。
如果启用了动态分配,请尝试禁用此配置 (spark.dynamicAllocation.enabled=false)。您可以在 conf/spark-defaults.conf 下将此 spark 配置设置为 --conf 或在代码中设置。
另请参阅:
我提交工作给Yarn-cluster
时设置了master as local[n]
。
在集群上 运行 时不要在代码中设置 master,而是使用 --master
.
我正在开发 Spark SQL 程序,我收到以下异常:
16/11/07 15:58:25 ERROR yarn.ApplicationMaster: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds]
java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:107)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.Union$$anonfun$doExecute.apply(basicOperators.scala:144)
at org.apache.spark.sql.execution.Union$$anonfun$doExecute.apply(basicOperators.scala:144)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:245)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.Union.doExecute(basicOperators.scala:144)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.columnar.InMemoryRelation.buildBuffers(InMemoryColumnarTableScan.scala:129)
at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>(InMemoryColumnarTableScan.scala:118)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryColumnarTableScan.scala:41)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery.apply(CacheManager.scala:93)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:60)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:84)
at org.apache.spark.sql.DataFrame.persist(DataFrame.scala:1581)
at org.apache.spark.sql.DataFrame.cache(DataFrame.scala:1590)
at com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56)
at com.somecompany.ml.modeling.NewModel.generateArtifacts(FlowForNewModel.scala:32)
at com.somecompany.ml.modeling.Flow$class.run(Flow.scala:52)
at com.somecompany.ml.modeling.lowForNewModel.run(FlowForNewModel.scala:15)
at com.somecompany.ml.Main$$anonfun.apply(Main.scala:54)
at com.somecompany.ml.Main$$anonfun.apply(Main.scala:54)
at scala.Option.getOrElse(Option.scala:121)
at com.somecompany.ml.Main$.main(Main.scala:46)
at com.somecompany.ml.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:542)
16/11/07 15:58:25 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds])
我从堆栈跟踪中识别出的代码的最后一部分是 com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56)
,这让我进入了这一行:profilesDF.cache()
在缓存之前,我在 2 个数据帧之间执行联合。我已经看到关于在连接之前保留两个数据帧的答案
而且我想知道是什么导致抛出这个异常? 搜索它让我得到 link 处理 rpc 超时异常或一些安全问题,这不是我的问题 如果您对如何解决它也有任何想法,我将不胜感激,但即使只是了解问题也会帮助我解决它
提前致谢
Question : I was wondering what may cause this exception to be thrown?
答案:
spark.sql.broadcastTimeout
300 Timeout in seconds for the broadcast wait time in broadcast joins
spark.network.timeout
120s Default timeout for all network interactions..spark.network.timeout (spark.rpc.askTimeout)
,spark.sql.broadcastTimeout
,spark.kryoserializer.buffer.max
(if you are using kryo serialization), etc. are tuned with larger-than-default values in order to handle complex queries. You can start with these values and adjust accordingly to your SQL workloads.
以下选项(参见 spark.sql. 属性)也可用于调整查询执行的性能。这些选项可能会在未来的版本中被弃用,因为会自动执行更多优化。*
此外,为了您更好地理解,您可以查看 BroadCastHashJoin 其中执行方法是上述堆栈跟踪的触发点。
protected override def doExecute(): RDD[Row] = {
val broadcastRelation = Await.result(broadcastFuture, timeout)
streamedPlan.execute().mapPartitions { streamedIter =>
hashJoin(streamedIter, broadcastRelation.value)
}
}
很高兴知道 Ram 的建议在某些情况下有效。我想提一下,我偶然发现了这个异常几次(包括 here 中描述的异常)。
很多时候,这是由于某些执行程序几乎无声的 OOM。在 SparkUI 上检查失败的任务,此 table 的最后一列:
如果理解好spark的内部结构,广播的数据会通过驱动程序。所以驱动程序有一些线程机制来从执行程序收集数据,并将其发送回所有执行程序。如果执行程序在某个时候失败,您可能会遇到这些超时。
如果启用了动态分配,请尝试禁用此配置 (spark.dynamicAllocation.enabled=false)。您可以在 conf/spark-defaults.conf 下将此 spark 配置设置为 --conf 或在代码中设置。
另请参阅:
我提交工作给Yarn-cluster
时设置了master as local[n]
。
在集群上 运行 时不要在代码中设置 master,而是使用 --master
.