Spark Streaming 异常处理策略
Spark Streaming exception handling strategies
我有一个 pyspark 流作业,它从 s3 流式传输目录(使用 textFileStream
)。输入的每一行都被解析并输出到 hdfs 上的 parquet 格式。
这在正常情况下效果很好。但是,当发生以下错误情况之一时,我有哪些选项可以恢复丢失的批量数据?
- 在调用
foreachRDD
的驱动程序中发生异常,其中发生输出操作(可能HdfsError
,或者在诸如partitionBy或[的输出操作期间出现spark sql异常=13=]).据我所知,这在 Spark 中被归类为 "action"(相对于 "transformation")。
- 执行器出现异常,可能是解析一行时map() lambda出现异常
我正在构建的系统必须是记录系统。我的所有输出语义都符合 exactly-once 输出语义的 spark 流文档(如果必须重新计算 batch/RDD,输出数据将被覆盖,而不是重复)。
如何处理输出操作中的失败(在 foreachRDD
内)? AFAICT,foreachRDD
内发生的异常不会导致流式作业停止。事实上,我已经尝试确定如何在 foreachRDD
中生成未处理的异常以停止作业,但一直无法做到。
假设驱动程序中发生未处理的异常。如果我需要更改代码来解决异常,我的理解是我需要在恢复之前删除检查点。在这种情况下,有没有办法从流作业停止的时间戳过去开始流作业?
一般来说,传递给类似 mapPartitions 的操作(map
、filter
、flatMap
)的函数内部抛出的每个异常都应该是可恢复的。没有充分的理由让整个操作/转换在单个格式错误的输入上失败。确切的策略将取决于您的要求(忽略、记录、保留以供进一步处理)。您可以在 What is the equivalent to scala.util.Try in pyspark?
中找到一些想法
处理操作范围内的故障肯定更难。由于通常它可能无法恢复,或者由于传入流量而无法等待,所以我会在失败的情况下乐观地重试,如果它不成功,则将原始数据推送到外部备份系统(例如 S3)。
我有一个 pyspark 流作业,它从 s3 流式传输目录(使用 textFileStream
)。输入的每一行都被解析并输出到 hdfs 上的 parquet 格式。
这在正常情况下效果很好。但是,当发生以下错误情况之一时,我有哪些选项可以恢复丢失的批量数据?
- 在调用
foreachRDD
的驱动程序中发生异常,其中发生输出操作(可能HdfsError
,或者在诸如partitionBy或[的输出操作期间出现spark sql异常=13=]).据我所知,这在 Spark 中被归类为 "action"(相对于 "transformation")。 - 执行器出现异常,可能是解析一行时map() lambda出现异常
我正在构建的系统必须是记录系统。我的所有输出语义都符合 exactly-once 输出语义的 spark 流文档(如果必须重新计算 batch/RDD,输出数据将被覆盖,而不是重复)。
如何处理输出操作中的失败(在 foreachRDD
内)? AFAICT,foreachRDD
内发生的异常不会导致流式作业停止。事实上,我已经尝试确定如何在 foreachRDD
中生成未处理的异常以停止作业,但一直无法做到。
假设驱动程序中发生未处理的异常。如果我需要更改代码来解决异常,我的理解是我需要在恢复之前删除检查点。在这种情况下,有没有办法从流作业停止的时间戳过去开始流作业?
一般来说,传递给类似 mapPartitions 的操作(map
、filter
、flatMap
)的函数内部抛出的每个异常都应该是可恢复的。没有充分的理由让整个操作/转换在单个格式错误的输入上失败。确切的策略将取决于您的要求(忽略、记录、保留以供进一步处理)。您可以在 What is the equivalent to scala.util.Try in pyspark?
处理操作范围内的故障肯定更难。由于通常它可能无法恢复,或者由于传入流量而无法等待,所以我会在失败的情况下乐观地重试,如果它不成功,则将原始数据推送到外部备份系统(例如 S3)。