如何在 (Py)Spark Structured Streaming 中捕获不正确(损坏的)JSON 记录?

How to capture incorrect (corrupt) JSON records in (Py)Spark Structured Streaming?

我有一个 Azure Eventhub,它是流式数据(JSON 格式)。 我将其作为 Spark 数据帧读取,将传入的 "body" 解析为 from_json(col("body"), schema),其中 schema 是预定义的。在代码中,它看起来像:

from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *

schema = StructType().add(...) # define the incoming JSON schema 

df_stream_input = (spark
.readStream
.format("eventhubs")
.options(**ehConfInput)
.load()
.select(from_json(col("body").cast("string"), schema)
)

现在 = 如果传入的 JSON 的架构与定义的架构之间存在一些 不一致 (例如,源 eventhub 开始以新格式发送数据而不注意),from_json() 函数不会抛出错误 = 相反,它会将 NULL 放入字段中,这些字段出现在我的 schema 定义中,但不在JSONs eventhub 发送.

我想捕获此信息并将其记录在某处(Spark 的 log4j、Azure Monitor、警告电子邮件等)。

我的问题是:实现此目标的最佳方法是什么。

我的一些想法:

  1. 我能想到的第一件事是有一个 UDF,它检查 NULLs,如果有任何问题,它会引发异常。我相信不可能通过 PySpark 将日志发送到 log4j,因为 "spark" 上下文无法在 UDF 中启动(在工作人员上)并且人们想使用默认值:

    log4jLogger = sc._jvm.org.apache.log4j 记录器 = log4jLogger.LogManager.getLogger('PySpark Logger')

  2. 我能想到的第二件事是使用 "foreach/foreachBatch" 函数并将此检查逻辑放在那里。

但我觉得这两种方法都像是.. 太多的习惯 - 我希望 Spark 有一些内置的东西用于这些目的。

tl;dr 您必须使用 foreachforeachBatch 运算符自己执行此检查逻辑。


原来我错误地认为 columnNameOfCorruptRecord 选项可能是一个答案。它不会起作用。

首先,由于this:

,它不会工作
case _: BadRecordException => null

其次,由于 this 会简单地禁用任何其他解析模式(包括似乎与 columnNameOfCorruptRecord 选项一起使用的 PERMISSIVE):

new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get))

换句话说,您唯一的选择是使用列表中的第二项,即 foreachforeachBatch 并自行处理损坏的记录。

解决方案可以使用 from_json,同时保留初始的 body 列。任何具有不正确 JSON 的记录都将以结果列 null 结束,并且 foreach* 会捕获它,例如

def handleCorruptRecords:
  // if json == null the body was corrupt
  // handle it

df_stream_input = (spark
  .readStream
  .format("eventhubs")
  .options(**ehConfInput)
  .load()
  .select("body", from_json(col("body").cast("string"), schema).as("json"))
).foreach(handleCorruptRecords).start()