Spark:如何检索失败阶段的原始数据?

Spark : How to retrieve the original data for which stage has failed?

当处理过程中出现异常时,Spark 会尝试重新处理它 3 次,如下面的日志所示。然后它将阶段标记为失败。我想检索舞台未能稍后对其进行分析或对其进行任何其他操作的所有数据。如何才能做到这一点?我正在用 SparkListeners 探索这个,但这似乎是开发者 API.

谢谢。

16/03/23 18:33:00 WARN TaskSetManager: Lost task 1.0 in stage 11.0 (TID 88, 192.168.213.53): java.lang.RuntimeException: Amit baby its exception time
    at com.yourcompany.custom.identifier.JavaRecoverableNetworkWordCount.call(JavaRecoverableNetworkWordCount.java:141)
    at com.yourcompany.custom.identifier.JavaRecoverableNetworkWordCount.call(JavaRecoverableNetworkWordCount.java:131)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn.apply(JavaDStreamLike.scala:172)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn.apply(JavaDStreamLike.scala:172)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:203)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

16/03/23 18:33:00 INFO TaskSetManager: Starting task 1.1 in stage 11.0 (TID 89, 192.168.213.53, NODE_LOCAL, 2535 bytes)
16/03/23 18:33:00 INFO TaskSetManager: Lost task 1.1 in stage 11.0 (TID 89) on executor 192.168.213.53: java.lang.RuntimeException (Amit baby its exception time) [duplicate 1]
16/03/23 18:33:00 INFO TaskSetManager: Starting task 1.2 in stage 11.0 (TID 90, 192.168.213.53, NODE_LOCAL, 2535 bytes)
16/03/23 18:33:00 INFO TaskSetManager: Lost task 1.2 in stage 11.0 (TID 90) on executor 192.168.213.53: java.lang.RuntimeException (Amit baby its exception time) [duplicate 2]
16/03/23 18:33:00 INFO TaskSetManager: Starting task 1.3 in stage 11.0 (TID 91, 192.168.213.53, NODE_LOCAL, 2535 bytes)
16/03/23 18:33:00 INFO TaskSetManager: Lost task 1.3 in stage 11.0 (TID 91) on executor 192.168.213.53: java.lang.RuntimeException (Amit baby its exception time) [duplicate 3]
16/03/23 18:33:00 ERROR TaskSetManager: Task 1 in stage 11.0 failed 4 times; aborting job
16/03/23 18:33:00 INFO TaskSchedulerImpl: Removed TaskSet 11.0, whose tasks have all completed, from pool 
16/03/23 18:33:00 INFO TaskSchedulerImpl: Cancelling stage 11

这是做不到的。任务中处理的数据通常不会比它所属的作业存在时间更长。当阶段失败时,作业不再存在,数据将被垃圾收集。没有任何参考,所以你无法得到它。

SparkListener确实是DeveloperAPI,但这并不意味着你不能使用它。还是一个publicAPI。这只是意味着它不能保证在 Spark 版本之间保持稳定。我们使用 SparkListener 大约从一年前开始,它实际上非常稳定。随意试一试。但我认为它不能解决你的问题。

虽然这是一个有效且有趣的想法。能够访问数据将对调试有很大帮助。您可以在 Spark JIRA 中提出功能请求。但这不是一件简单的事情。 Spark 任务比您提供的用户代码要复杂得多。因此,即使任务的输入可用于调试,如何充分利用它也并非易事。无论如何,我认为值得一谈!