如何将一列 JSON 字符串转换为镶木地板 table

How do I convert a column of JSON strings into a parquet table

我正在尝试将我收到的一些数据转换成镶木地板 table,我最终可以将其用于报告,但感觉我错过了一步。

我收到 CSV 文件,格式为“id”、“event”、“source”,其中“event”列是 GZIP 压缩的 JSON 字符串。我已经能够设置一个数据框来提取三列,包括解压缩 JSON 字符串。所以我有一个 table 现在有

id | event | source | unencoded_event

其中 unencoded_event 是 JSON 字符串。

此时我想做的是获取 JSON 的一个字符串列并将其解析为单独的列。根据另一位开发人员的评论(转换为 parquet 的过程足够聪明,只需使用我的结果的第一行来找出模式),我试过这个:

df1 = spark.read.json(df.select("unencoded_event").rdd).write.format("parquet").saveAsTable("test")

但这只给了我一个单独的列 table 和一个 _corrupt_record 的列,它再次具有 JSON 字符串。

我想要达到的目的是采用模式:

{
  "agent"
  --"name"
  --"organization"
  "entity"
  --"name"
  ----"type"
  ----"value"
}

并使 table 最终看起来像: AgentName | Organization | EventType | EventValue

我遗漏的步骤是明确定义模式还是我过度简化了我的方法?

这里的潜在并发症:JSON 模式实际上比上面更复杂;我一直假设我可以将完整的架构扩展到更宽的 table,然后 return 我关心的较小的集合。

我还尝试从文件中获取单个结果(因此,单个 JSON 字符串),将其保存为 JSON 文件并尝试从中读取。这样做是有效的,即,执行 spark.read.json(myJSON.json) 将字符串解析为我期望的数组。如果我复制多个字符串也是如此。

如果我获取原始结果并尝试保存它们,这将不起作用。如果我尝试仅将字符串列保存为 json 文件

dfWrite = df.select(col("unencoded_event"))
dfWrite.write.mode("overwrite").json(write_location)

然后将它们读回,这与以往不同...每一行仍被视为字符串。

我确实找到了一种有效的解决方案。这不是一个完美的解决方案(我担心它不可扩展),但它让我到达了我需要去的地方。

我可以 select 使用 get_json_object() 为我想要的每一列数据(抱歉,我一整天都在摆弄列名等):

dfResults = df.select(get_json_object("unencoded_event", "$.agent[0].name").alias("userID"), 
get_json_object("unencoded_event", "$.entity[0].identifier.value").alias("itemID"),
get_json_object("unencoded_event", "$.entity[0].detail[1].value").alias("itemInfo"),
get_json_object("unencoded_event", "$.recorded").alias("timeStamp"))

我最不喜欢的是它似乎无法将 filter/search 选项与 get_json_object() 一起使用。这在可预见的未来很好,因为现在我知道所有数据应该放在哪里,不需要过滤。

我相信我也可以使用 from_json() 但这需要在笔记本中定义模式。这不是一个很好的选择,因为我只需要 JSON 的一小部分,所以定义整个架构感觉像是不必要的工作。 (我也无法控制整体架构,因此这成为维护问题。)