使用 rdd.pipe 获取子进程报错给master
Getting the error of subprocess to the master when using rdd.pipe
当使用rdd.pipe(command)
时,subprocess的错误不会返回master。例如,如果有人这样做:
sc.parallelize(Range(0, 10)).pipe("ls fileThatDontExist").collect
堆栈跟踪如下:
java.lang.Exception: Subprocess exited with status 1
at org.apache.spark.rdd.PipedRDD$$anon.hasNext(PipedRDD.scala:161)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.rdd.PipedRDD$$anon.foreach(PipedRDD.scala:153)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at org.apache.spark.rdd.PipedRDD$$anon.to(PipedRDD.scala:153)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at org.apache.spark.rdd.PipedRDD$$anon.toBuffer(PipedRDD.scala:153)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at org.apache.spark.rdd.PipedRDD$$anon.toArray(PipedRDD.scala:153)
at org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun.apply(RDD.scala:885)
at org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun.apply(RDD.scala:885)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1767)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1767)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1264)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1263)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
这里不提命令中发生的错误,需要在执行器日志中搜索才能找到:
ls: fileThatDontExist: No such file or directory
查看PipedRDD的代码,似乎可以在抛出异常时添加更多的信息(比如在message中添加proc.getErrorStream的内容):
val exitStatus = proc.waitFor()
if (exitStatus != 0) {
throw new Exception("Subprocess exited with status " + exitStatus)
}
关于这个我有两个问题。有没有不这样做的理由?还有人知道短路吗?
目前我已经封装了流程执行,当流程出现错误时,我return 0并输出流程的stderr加上一个标记。然后映射 RDD,包含标记的行抛出 stderr 异常。
截至目前(Spark 1.6),当前行为是 print the stderr of the process spawned in the standard error output of the executor. This seems to be a very early choice from Spark's very own creator, Matei Zaharia, as you can see here,可追溯到 2011 年。我没有看到在当前实现中收集 stderr 的任何其他方法。
最近对 Spark 2.0 进行了更改,以将任何异常从子进程传播到调用进程(请参阅 SPARK-13793), and a minor change has been added to the exception thrown when the exit status is different than 0 (see this line)。
这可以作为改进提出,如果您需要任何帮助来建议将其作为对 Spark 的增强,请告诉我。
当使用rdd.pipe(command)
时,subprocess的错误不会返回master。例如,如果有人这样做:
sc.parallelize(Range(0, 10)).pipe("ls fileThatDontExist").collect
堆栈跟踪如下:
java.lang.Exception: Subprocess exited with status 1
at org.apache.spark.rdd.PipedRDD$$anon.hasNext(PipedRDD.scala:161)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.rdd.PipedRDD$$anon.foreach(PipedRDD.scala:153)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at org.apache.spark.rdd.PipedRDD$$anon.to(PipedRDD.scala:153)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at org.apache.spark.rdd.PipedRDD$$anon.toBuffer(PipedRDD.scala:153)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at org.apache.spark.rdd.PipedRDD$$anon.toArray(PipedRDD.scala:153)
at org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun.apply(RDD.scala:885)
at org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun.apply(RDD.scala:885)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1767)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1767)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1264)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1263)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
这里不提命令中发生的错误,需要在执行器日志中搜索才能找到:
ls: fileThatDontExist: No such file or directory
查看PipedRDD的代码,似乎可以在抛出异常时添加更多的信息(比如在message中添加proc.getErrorStream的内容):
val exitStatus = proc.waitFor()
if (exitStatus != 0) {
throw new Exception("Subprocess exited with status " + exitStatus)
}
关于这个我有两个问题。有没有不这样做的理由?还有人知道短路吗?
目前我已经封装了流程执行,当流程出现错误时,我return 0并输出流程的stderr加上一个标记。然后映射 RDD,包含标记的行抛出 stderr 异常。
截至目前(Spark 1.6),当前行为是 print the stderr of the process spawned in the standard error output of the executor. This seems to be a very early choice from Spark's very own creator, Matei Zaharia, as you can see here,可追溯到 2011 年。我没有看到在当前实现中收集 stderr 的任何其他方法。
最近对 Spark 2.0 进行了更改,以将任何异常从子进程传播到调用进程(请参阅 SPARK-13793), and a minor change has been added to the exception thrown when the exit status is different than 0 (see this line)。
这可以作为改进提出,如果您需要任何帮助来建议将其作为对 Spark 的增强,请告诉我。