Spark Structured Streaming 从查询异常中恢复

Spark Structured Streaming recovering from a query exception

是否可以从查询执行期间抛出的异常中自动恢复?

上下文: 我正在开发一个 Spark 应用程序,它从 Kafka 主题读取数据、处理数据并输出到 S3。然而,在 运行 生产几天后,spark 应用程序面临来自 S3 的一些网络故障,导致抛出异常并停止应用程序。还值得一提的是,此应用程序使用 GCP's Spark k8s Operator.

在 Kubernetes 上运行

据我目前所见,这些异常是次要的,只需重新启动应用程序即可解决问题。我们可以处理这些异常并自动重启结构化流查询吗?

下面是一个抛出异常的例子:

    Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Job aborted.
    === Streaming Query ===
    Identifier: ...
    Current Committed Offsets: ...
    Current Available Offsets: ...

    Current State: ACTIVE
    Thread State: RUNNABLE

    Logical Plan: ...

        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:193)
    Caused by: org.apache.spark.SparkException: Job aborted.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:676)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:676)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
        at io.blahblahView$$anonfun$$anonfun$apply.apply(View.scala:90)
        at io.blahblahView $$anonfun$$anonfun$apply.apply(View.scala:82)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach.apply(TraversableLike.scala:733)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
        at io.blahblahView$$anonfun.apply(View.scala:82)
        at io.blahblahView$$anonfun.apply(View.scala:79)
        at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$$anonfun$apply.apply(MicroBatchExecution.scala:537)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch.apply(MicroBatchExecution.scala:535)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply$mcV$sp(MicroBatchExecution.scala:198)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:166)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:166)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream.apply$mcZ$sp(MicroBatchExecution.scala:166)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
        ... 1 more
    Caused by: java.io.FileNotFoundException: No such file or directory: s3a://.../view/v1/_temporary/0
        at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:993)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:734)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:291)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:361)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:334)
        at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:187)
        ... 47 more

自动处理此类问题的最简单方法是什么?

不,没有可靠的方法可以做到这一点。顺便说一句,否也是一个答案。

  • 检查异常的逻辑一般是通过驱动程序上的 try / catch 运行。

  • 由于 Executor 级别的意外情况已经由 Spark Framework 本身针对结构化流进行了标准处理,如果错误是不可恢复的,那么 App / Job 在发出错误信号后就会崩溃( s) 返回驱动程序,除非您在各种 foreachXXX 构造中编写 try / catch 代码。

    • 也就是说,对于 foreachXXX 构造来说,微批次是否可以在这种方法中恢复 afaics 并不清楚,微批次的某些部分很可能会丢失。虽然很难测试。
  • 既然 Spark 有一些标准的东西是你不能挂钩的,为什么可以在程序的源代码中插入一个循环或 try/catch?同样,广播变量也是一个问题——尽管有些人说他们有解决这个问题的技术。但这不符合框架的精神。

所以,这个问题问得好(过去)。

在花了太多时间试图找到解决此问题的优雅方法后,却一无所获,这就是我想出的办法。

有些人可能会说这是一个 hack,但它很简单,它可以解决复杂的问题。我在生产中对其进行了测试,它解决了由于偶尔出现的小异常而自动从故障中恢复的问题。

我称它为查询看门狗。这是看门狗将无限期地重试 运行 查询的最简单版本:

val writer = df.writeStream...

while (true) {
   val query = writer.start()

   try {
        query.awaitTermination()
   } 
   catch {
       case e: StreamingQueryException => println("Streaming Query Exception caught!: " + e);
   }
}

有些人可能想用某种计数器代替 while(true) 来限制重试次数。有人还可以补充此代码,并在发生重试时通过 slack 或电子邮件发送通知。其他人可以简单地收集普罗米修斯的重试次数。

希望对您有所帮助,

干杯