Spark Structured Streaming 从查询异常中恢复
Spark Structured Streaming recovering from a query exception
是否可以从查询执行期间抛出的异常中自动恢复?
上下文: 我正在开发一个 Spark 应用程序,它从 Kafka 主题读取数据、处理数据并输出到 S3。然而,在 运行 生产几天后,spark 应用程序面临来自 S3 的一些网络故障,导致抛出异常并停止应用程序。还值得一提的是,此应用程序使用 GCP's Spark k8s Operator.
在 Kubernetes 上运行
据我目前所见,这些异常是次要的,只需重新启动应用程序即可解决问题。我们可以处理这些异常并自动重启结构化流查询吗?
下面是一个抛出异常的例子:
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Job aborted.
=== Streaming Query ===
Identifier: ...
Current Committed Offsets: ...
Current Available Offsets: ...
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan: ...
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:193)
Caused by: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at io.blahblahView$$anonfun$$anonfun$apply.apply(View.scala:90)
at io.blahblahView $$anonfun$$anonfun$apply.apply(View.scala:82)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach.apply(TraversableLike.scala:733)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at io.blahblahView$$anonfun.apply(View.scala:82)
at io.blahblahView$$anonfun.apply(View.scala:79)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$$anonfun$apply.apply(MicroBatchExecution.scala:537)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch.apply(MicroBatchExecution.scala:535)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply$mcV$sp(MicroBatchExecution.scala:198)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
... 1 more
Caused by: java.io.FileNotFoundException: No such file or directory: s3a://.../view/v1/_temporary/0
at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:993)
at org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:734)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:291)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:361)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:334)
at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:187)
... 47 more
自动处理此类问题的最简单方法是什么?
不,没有可靠的方法可以做到这一点。顺便说一句,否也是一个答案。
检查异常的逻辑一般是通过驱动程序上的 try / catch 运行。
由于 Executor 级别的意外情况已经由 Spark Framework 本身针对结构化流进行了标准处理,如果错误是不可恢复的,那么 App / Job 在发出错误信号后就会崩溃( s) 返回驱动程序,除非您在各种 foreachXXX 构造中编写 try / catch 代码。
- 也就是说,对于 foreachXXX 构造来说,微批次是否可以在这种方法中恢复 afaics 并不清楚,微批次的某些部分很可能会丢失。虽然很难测试。
既然 Spark 有一些标准的东西是你不能挂钩的,为什么可以在程序的源代码中插入一个循环或 try/catch?同样,广播变量也是一个问题——尽管有些人说他们有解决这个问题的技术。但这不符合框架的精神。
所以,这个问题问得好(过去)。
在花了太多时间试图找到解决此问题的优雅方法后,却一无所获,这就是我想出的办法。
有些人可能会说这是一个 hack,但它很简单,它可以解决复杂的问题。我在生产中对其进行了测试,它解决了由于偶尔出现的小异常而自动从故障中恢复的问题。
我称它为查询看门狗。这是看门狗将无限期地重试 运行 查询的最简单版本:
val writer = df.writeStream...
while (true) {
val query = writer.start()
try {
query.awaitTermination()
}
catch {
case e: StreamingQueryException => println("Streaming Query Exception caught!: " + e);
}
}
有些人可能想用某种计数器代替 while(true)
来限制重试次数。有人还可以补充此代码,并在发生重试时通过 slack 或电子邮件发送通知。其他人可以简单地收集普罗米修斯的重试次数。
希望对您有所帮助,
干杯
是否可以从查询执行期间抛出的异常中自动恢复?
上下文: 我正在开发一个 Spark 应用程序,它从 Kafka 主题读取数据、处理数据并输出到 S3。然而,在 运行 生产几天后,spark 应用程序面临来自 S3 的一些网络故障,导致抛出异常并停止应用程序。还值得一提的是,此应用程序使用 GCP's Spark k8s Operator.
在 Kubernetes 上运行据我目前所见,这些异常是次要的,只需重新启动应用程序即可解决问题。我们可以处理这些异常并自动重启结构化流查询吗?
下面是一个抛出异常的例子:
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Job aborted.
=== Streaming Query ===
Identifier: ...
Current Committed Offsets: ...
Current Available Offsets: ...
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan: ...
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:193)
Caused by: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at io.blahblahView$$anonfun$$anonfun$apply.apply(View.scala:90)
at io.blahblahView $$anonfun$$anonfun$apply.apply(View.scala:82)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach.apply(TraversableLike.scala:733)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at io.blahblahView$$anonfun.apply(View.scala:82)
at io.blahblahView$$anonfun.apply(View.scala:79)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$$anonfun$apply.apply(MicroBatchExecution.scala:537)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch.apply(MicroBatchExecution.scala:535)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply$mcV$sp(MicroBatchExecution.scala:198)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
... 1 more
Caused by: java.io.FileNotFoundException: No such file or directory: s3a://.../view/v1/_temporary/0
at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:993)
at org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:734)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:291)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:361)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:334)
at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:187)
... 47 more
自动处理此类问题的最简单方法是什么?
不,没有可靠的方法可以做到这一点。顺便说一句,否也是一个答案。
检查异常的逻辑一般是通过驱动程序上的 try / catch 运行。
由于 Executor 级别的意外情况已经由 Spark Framework 本身针对结构化流进行了标准处理,如果错误是不可恢复的,那么 App / Job 在发出错误信号后就会崩溃( s) 返回驱动程序,除非您在各种 foreachXXX 构造中编写 try / catch 代码。
- 也就是说,对于 foreachXXX 构造来说,微批次是否可以在这种方法中恢复 afaics 并不清楚,微批次的某些部分很可能会丢失。虽然很难测试。
既然 Spark 有一些标准的东西是你不能挂钩的,为什么可以在程序的源代码中插入一个循环或 try/catch?同样,广播变量也是一个问题——尽管有些人说他们有解决这个问题的技术。但这不符合框架的精神。
所以,这个问题问得好(过去)。
在花了太多时间试图找到解决此问题的优雅方法后,却一无所获,这就是我想出的办法。
有些人可能会说这是一个 hack,但它很简单,它可以解决复杂的问题。我在生产中对其进行了测试,它解决了由于偶尔出现的小异常而自动从故障中恢复的问题。
我称它为查询看门狗。这是看门狗将无限期地重试 运行 查询的最简单版本:
val writer = df.writeStream...
while (true) {
val query = writer.start()
try {
query.awaitTermination()
}
catch {
case e: StreamingQueryException => println("Streaming Query Exception caught!: " + e);
}
}
有些人可能想用某种计数器代替 while(true)
来限制重试次数。有人还可以补充此代码,并在发生重试时通过 slack 或电子邮件发送通知。其他人可以简单地收集普罗米修斯的重试次数。
希望对您有所帮助,
干杯