使用 rdd.pipe 获取子进程报错给master

Getting the error of subprocess to the master when using rdd.pipe

当使用rdd.pipe(command)时,subprocess的错误不会返回master。例如,如果有人这样做:

sc.parallelize(Range(0, 10)).pipe("ls fileThatDontExist").collect

堆栈跟踪如下:

java.lang.Exception: Subprocess exited with status 1
    at org.apache.spark.rdd.PipedRDD$$anon.hasNext(PipedRDD.scala:161)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at org.apache.spark.rdd.PipedRDD$$anon.foreach(PipedRDD.scala:153)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at org.apache.spark.rdd.PipedRDD$$anon.to(PipedRDD.scala:153)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at org.apache.spark.rdd.PipedRDD$$anon.toBuffer(PipedRDD.scala:153)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at org.apache.spark.rdd.PipedRDD$$anon.toArray(PipedRDD.scala:153)
    at org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun.apply(RDD.scala:885)
    at org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun.apply(RDD.scala:885)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1767)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1767)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1264)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1263)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:730)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:730)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
    at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)

这里不提命令中发生的错误,需要在执行器日志中搜索才能找到:

ls: fileThatDontExist: No such file or directory

查看PipedRDD的代码,似乎可以在抛出异常时添加更多的信息(比如在message中添加proc.getErrorStream的内容):

val exitStatus = proc.waitFor()
if (exitStatus != 0) {
  throw new Exception("Subprocess exited with status " + exitStatus)
}

关于这个我有两个问题。有没有不这样做的理由?还有人知道短路吗?

目前我已经封装了流程执行,当流程出现错误时,我return 0并输出流程的stderr加上一个标记。然后映射 RDD,包含标记的行抛出 stderr 异常。

截至目前(Spark 1.6),当前行为是 print the stderr of the process spawned in the standard error output of the executor. This seems to be a very early choice from Spark's very own creator, Matei Zaharia, as you can see here,可追溯到 2011 年。我没有看到在当前实现中收集 stderr 的任何其他方法。

最近对 Spark 2.0 进行了更改,以将任何异常从子进程传播到调用进程(请参阅 SPARK-13793), and a minor change has been added to the exception thrown when the exit status is different than 0 (see this line)。

这可以作为改进提出,如果您需要任何帮助来建议将其作为对 Spark 的增强,请告诉我。