在 spark 结构化流中写入来自 kafka / json 数据源的损坏数据
writing corrupt data from kafka / json datasource in spark structured streaming
在 spark 批处理作业中,我通常将 JSON 数据源写入文件,并且可以使用 DataFrame 的损坏列功能 reader 将损坏的数据写出到单独的位置,以及另一个reader 从同一个作业中写入有效数据。 (数据写成parquet)
但在 Spark Structred Streaming 中,我首先通过 kafka 将流作为字符串读取,然后使用 from_json 获取我的 DataFrame。然后 from_json 使用 JsonToStructs,它在解析器中使用 FailFast 模式,而不是 return 未解析的字符串到 DataFrame 中的列。 (请参阅参考文献中的注释)然后如何使用 SSS 将与我的模式不匹配且可能无效 JSON 的损坏数据写入另一个位置?
终于在批处理作业中,同一个作业可以写入两个数据帧。但是 Spark Structured Streaming 需要对多个接收器进行特殊处理。然后在 Spark 2.3.1(我当前的版本)中,我们应该包含有关如何正确写入损坏和无效流的详细信息...
参考:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-Expression-JsonToStructs.html
val rawKafkaDataFrame=spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", config.broker)
.option("kafka.ssl.truststore.location", path.toString)
.option("kafka.ssl.truststore.password", config.pass)
.option("kafka.ssl.truststore.type", "JKS")
.option("kafka.security.protocol", "SSL")
.option("subscribe", config.topic)
.option("startingOffsets", "earliest")
.load()
val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string"))
// does not provide a corrupt column or way to work with corrupt
jsonDataFrame.select(from_json(col("value"), schema)).select("jsontostructs(value).*")
当您从字符串转换为 json 时,如果它无法使用提供的架构进行解析,它将 return 为空。您可以过滤空值和 select 字符串。像这样。
val jsonDF = jsonDataFrame.withColumn("json", from_json(col("value"), schema))
val invalidJsonDF = jsonDF.filter(col("json").isNull).select("value")
我只是想找出 _corrupt_record 结构化流的等价物。这是我想出的;希望它能让你更接近你正在寻找的东西:
// add a status column to partition our output by
// optional: only keep the unparsed json if it was corrupt
// writes up to 2 subdirs: 'out.par/status=OK' and 'out.par/status=CORRUPT'
// additional status codes for validation of nested fields could be added in similar fashion
df.withColumn("struct", from_json($"value", schema))
.withColumn("status", when($"struct".isNull, lit("CORRUPT")).otherwise(lit("OK")))
.withColumn("value", when($"status" <=> lit("CORRUPT"), $"value"))
.write
.partitionBy("status")
.parquet("out.par")
在 spark 批处理作业中,我通常将 JSON 数据源写入文件,并且可以使用 DataFrame 的损坏列功能 reader 将损坏的数据写出到单独的位置,以及另一个reader 从同一个作业中写入有效数据。 (数据写成parquet)
但在 Spark Structred Streaming 中,我首先通过 kafka 将流作为字符串读取,然后使用 from_json 获取我的 DataFrame。然后 from_json 使用 JsonToStructs,它在解析器中使用 FailFast 模式,而不是 return 未解析的字符串到 DataFrame 中的列。 (请参阅参考文献中的注释)然后如何使用 SSS 将与我的模式不匹配且可能无效 JSON 的损坏数据写入另一个位置?
终于在批处理作业中,同一个作业可以写入两个数据帧。但是 Spark Structured Streaming 需要对多个接收器进行特殊处理。然后在 Spark 2.3.1(我当前的版本)中,我们应该包含有关如何正确写入损坏和无效流的详细信息...
参考:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-Expression-JsonToStructs.html
val rawKafkaDataFrame=spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", config.broker)
.option("kafka.ssl.truststore.location", path.toString)
.option("kafka.ssl.truststore.password", config.pass)
.option("kafka.ssl.truststore.type", "JKS")
.option("kafka.security.protocol", "SSL")
.option("subscribe", config.topic)
.option("startingOffsets", "earliest")
.load()
val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string"))
// does not provide a corrupt column or way to work with corrupt
jsonDataFrame.select(from_json(col("value"), schema)).select("jsontostructs(value).*")
当您从字符串转换为 json 时,如果它无法使用提供的架构进行解析,它将 return 为空。您可以过滤空值和 select 字符串。像这样。
val jsonDF = jsonDataFrame.withColumn("json", from_json(col("value"), schema))
val invalidJsonDF = jsonDF.filter(col("json").isNull).select("value")
我只是想找出 _corrupt_record 结构化流的等价物。这是我想出的;希望它能让你更接近你正在寻找的东西:
// add a status column to partition our output by
// optional: only keep the unparsed json if it was corrupt
// writes up to 2 subdirs: 'out.par/status=OK' and 'out.par/status=CORRUPT'
// additional status codes for validation of nested fields could be added in similar fashion
df.withColumn("struct", from_json($"value", schema))
.withColumn("status", when($"struct".isNull, lit("CORRUPT")).otherwise(lit("OK")))
.withColumn("value", when($"status" <=> lit("CORRUPT"), $"value"))
.write
.partitionBy("status")
.parquet("out.par")