如何在 (Py)Spark Structured Streaming 中捕获不正确(损坏的)JSON 记录?
How to capture incorrect (corrupt) JSON records in (Py)Spark Structured Streaming?
我有一个 Azure Eventhub,它是流式数据(JSON 格式)。
我将其作为 Spark 数据帧读取,将传入的 "body" 解析为 from_json(col("body"), schema)
,其中 schema
是预定义的。在代码中,它看起来像:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *
schema = StructType().add(...) # define the incoming JSON schema
df_stream_input = (spark
.readStream
.format("eventhubs")
.options(**ehConfInput)
.load()
.select(from_json(col("body").cast("string"), schema)
)
现在 = 如果传入的 JSON 的架构与定义的架构之间存在一些 不一致 (例如,源 eventhub 开始以新格式发送数据而不注意),from_json()
函数不会抛出错误 = 相反,它会将 NULL
放入字段中,这些字段出现在我的 schema
定义中,但不在JSONs eventhub 发送.
我想捕获此信息并将其记录在某处(Spark 的 log4j、Azure Monitor、警告电子邮件等)。
我的问题是:实现此目标的最佳方法是什么。
我的一些想法:
我能想到的第一件事是有一个 UDF,它检查 NULLs
,如果有任何问题,它会引发异常。我相信不可能通过 PySpark 将日志发送到 log4j,因为 "spark" 上下文无法在 UDF 中启动(在工作人员上)并且人们想使用默认值:
log4jLogger = sc._jvm.org.apache.log4j
记录器 = log4jLogger.LogManager.getLogger('PySpark Logger')
我能想到的第二件事是使用 "foreach/foreachBatch" 函数并将此检查逻辑放在那里。
但我觉得这两种方法都像是.. 太多的习惯 - 我希望 Spark 有一些内置的东西用于这些目的。
tl;dr 您必须使用 foreach
或 foreachBatch
运算符自己执行此检查逻辑。
原来我错误地认为 columnNameOfCorruptRecord
选项可能是一个答案。它不会起作用。
首先,由于this:
,它不会工作
case _: BadRecordException => null
其次,由于 this 会简单地禁用任何其他解析模式(包括似乎与 columnNameOfCorruptRecord
选项一起使用的 PERMISSIVE
):
new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get))
换句话说,您唯一的选择是使用列表中的第二项,即 foreach
或 foreachBatch
并自行处理损坏的记录。
解决方案可以使用 from_json
,同时保留初始的 body
列。任何具有不正确 JSON 的记录都将以结果列 null
结束,并且 foreach*
会捕获它,例如
def handleCorruptRecords:
// if json == null the body was corrupt
// handle it
df_stream_input = (spark
.readStream
.format("eventhubs")
.options(**ehConfInput)
.load()
.select("body", from_json(col("body").cast("string"), schema).as("json"))
).foreach(handleCorruptRecords).start()
我有一个 Azure Eventhub,它是流式数据(JSON 格式)。
我将其作为 Spark 数据帧读取,将传入的 "body" 解析为 from_json(col("body"), schema)
,其中 schema
是预定义的。在代码中,它看起来像:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *
schema = StructType().add(...) # define the incoming JSON schema
df_stream_input = (spark
.readStream
.format("eventhubs")
.options(**ehConfInput)
.load()
.select(from_json(col("body").cast("string"), schema)
)
现在 = 如果传入的 JSON 的架构与定义的架构之间存在一些 不一致 (例如,源 eventhub 开始以新格式发送数据而不注意),from_json()
函数不会抛出错误 = 相反,它会将 NULL
放入字段中,这些字段出现在我的 schema
定义中,但不在JSONs eventhub 发送.
我想捕获此信息并将其记录在某处(Spark 的 log4j、Azure Monitor、警告电子邮件等)。
我的问题是:实现此目标的最佳方法是什么。
我的一些想法:
我能想到的第一件事是有一个 UDF,它检查
NULLs
,如果有任何问题,它会引发异常。我相信不可能通过 PySpark 将日志发送到 log4j,因为 "spark" 上下文无法在 UDF 中启动(在工作人员上)并且人们想使用默认值:log4jLogger = sc._jvm.org.apache.log4j 记录器 = log4jLogger.LogManager.getLogger('PySpark Logger')
我能想到的第二件事是使用 "foreach/foreachBatch" 函数并将此检查逻辑放在那里。
但我觉得这两种方法都像是.. 太多的习惯 - 我希望 Spark 有一些内置的东西用于这些目的。
tl;dr 您必须使用 foreach
或 foreachBatch
运算符自己执行此检查逻辑。
原来我错误地认为 columnNameOfCorruptRecord
选项可能是一个答案。它不会起作用。
首先,由于this:
,它不会工作case _: BadRecordException => null
其次,由于 this 会简单地禁用任何其他解析模式(包括似乎与 columnNameOfCorruptRecord
选项一起使用的 PERMISSIVE
):
new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get))
换句话说,您唯一的选择是使用列表中的第二项,即 foreach
或 foreachBatch
并自行处理损坏的记录。
解决方案可以使用 from_json
,同时保留初始的 body
列。任何具有不正确 JSON 的记录都将以结果列 null
结束,并且 foreach*
会捕获它,例如
def handleCorruptRecords:
// if json == null the body was corrupt
// handle it
df_stream_input = (spark
.readStream
.format("eventhubs")
.options(**ehConfInput)
.load()
.select("body", from_json(col("body").cast("string"), schema).as("json"))
).foreach(handleCorruptRecords).start()