如何将一列 JSON 字符串转换为镶木地板 table
How do I convert a column of JSON strings into a parquet table
我正在尝试将我收到的一些数据转换成镶木地板 table,我最终可以将其用于报告,但感觉我错过了一步。
我收到 CSV 文件,格式为“id”、“event”、“source”,其中“event”列是 GZIP 压缩的 JSON 字符串。我已经能够设置一个数据框来提取三列,包括解压缩 JSON 字符串。所以我有一个 table 现在有
id | event | source | unencoded_event
其中 unencoded_event
是 JSON 字符串。
此时我想做的是获取 JSON 的一个字符串列并将其解析为单独的列。根据另一位开发人员的评论(转换为 parquet 的过程足够聪明,只需使用我的结果的第一行来找出模式),我试过这个:
df1 = spark.read.json(df.select("unencoded_event").rdd).write.format("parquet").saveAsTable("test")
但这只给了我一个单独的列 table 和一个 _corrupt_record
的列,它再次具有 JSON 字符串。
我想要达到的目的是采用模式:
{
"agent"
--"name"
--"organization"
"entity"
--"name"
----"type"
----"value"
}
并使 table 最终看起来像:
AgentName | Organization | EventType | EventValue
我遗漏的步骤是明确定义模式还是我过度简化了我的方法?
这里的潜在并发症:JSON 模式实际上比上面更复杂;我一直假设我可以将完整的架构扩展到更宽的 table,然后 return 我关心的较小的集合。
我还尝试从文件中获取单个结果(因此,单个 JSON 字符串),将其保存为 JSON 文件并尝试从中读取。这样做是有效的,即,执行 spark.read.json(myJSON.json)
将字符串解析为我期望的数组。如果我复制多个字符串也是如此。
如果我获取原始结果并尝试保存它们,这将不起作用。如果我尝试仅将字符串列保存为 json 文件
dfWrite = df.select(col("unencoded_event"))
dfWrite.write.mode("overwrite").json(write_location)
然后将它们读回,这与以往不同...每一行仍被视为字符串。
我确实找到了一种有效的解决方案。这不是一个完美的解决方案(我担心它不可扩展),但它让我到达了我需要去的地方。
我可以 select 使用 get_json_object()
为我想要的每一列数据(抱歉,我一整天都在摆弄列名等):
dfResults = df.select(get_json_object("unencoded_event", "$.agent[0].name").alias("userID"),
get_json_object("unencoded_event", "$.entity[0].identifier.value").alias("itemID"),
get_json_object("unencoded_event", "$.entity[0].detail[1].value").alias("itemInfo"),
get_json_object("unencoded_event", "$.recorded").alias("timeStamp"))
我最不喜欢的是它似乎无法将 filter/search 选项与 get_json_object()
一起使用。这在可预见的未来很好,因为现在我知道所有数据应该放在哪里,不需要过滤。
我相信我也可以使用 from_json()
但这需要在笔记本中定义模式。这不是一个很好的选择,因为我只需要 JSON 的一小部分,所以定义整个架构感觉像是不必要的工作。 (我也无法控制整体架构,因此这成为维护问题。)
我正在尝试将我收到的一些数据转换成镶木地板 table,我最终可以将其用于报告,但感觉我错过了一步。
我收到 CSV 文件,格式为“id”、“event”、“source”,其中“event”列是 GZIP 压缩的 JSON 字符串。我已经能够设置一个数据框来提取三列,包括解压缩 JSON 字符串。所以我有一个 table 现在有
id | event | source | unencoded_event
其中 unencoded_event
是 JSON 字符串。
此时我想做的是获取 JSON 的一个字符串列并将其解析为单独的列。根据另一位开发人员的评论(转换为 parquet 的过程足够聪明,只需使用我的结果的第一行来找出模式),我试过这个:
df1 = spark.read.json(df.select("unencoded_event").rdd).write.format("parquet").saveAsTable("test")
但这只给了我一个单独的列 table 和一个 _corrupt_record
的列,它再次具有 JSON 字符串。
我想要达到的目的是采用模式:
{
"agent"
--"name"
--"organization"
"entity"
--"name"
----"type"
----"value"
}
并使 table 最终看起来像:
AgentName | Organization | EventType | EventValue
我遗漏的步骤是明确定义模式还是我过度简化了我的方法?
这里的潜在并发症:JSON 模式实际上比上面更复杂;我一直假设我可以将完整的架构扩展到更宽的 table,然后 return 我关心的较小的集合。
我还尝试从文件中获取单个结果(因此,单个 JSON 字符串),将其保存为 JSON 文件并尝试从中读取。这样做是有效的,即,执行 spark.read.json(myJSON.json)
将字符串解析为我期望的数组。如果我复制多个字符串也是如此。
如果我获取原始结果并尝试保存它们,这将不起作用。如果我尝试仅将字符串列保存为 json 文件
dfWrite = df.select(col("unencoded_event"))
dfWrite.write.mode("overwrite").json(write_location)
然后将它们读回,这与以往不同...每一行仍被视为字符串。
我确实找到了一种有效的解决方案。这不是一个完美的解决方案(我担心它不可扩展),但它让我到达了我需要去的地方。
我可以 select 使用 get_json_object()
为我想要的每一列数据(抱歉,我一整天都在摆弄列名等):
dfResults = df.select(get_json_object("unencoded_event", "$.agent[0].name").alias("userID"),
get_json_object("unencoded_event", "$.entity[0].identifier.value").alias("itemID"),
get_json_object("unencoded_event", "$.entity[0].detail[1].value").alias("itemInfo"),
get_json_object("unencoded_event", "$.recorded").alias("timeStamp"))
我最不喜欢的是它似乎无法将 filter/search 选项与 get_json_object()
一起使用。这在可预见的未来很好,因为现在我知道所有数据应该放在哪里,不需要过滤。
我相信我也可以使用 from_json()
但这需要在笔记本中定义模式。这不是一个很好的选择,因为我只需要 JSON 的一小部分,所以定义整个架构感觉像是不必要的工作。 (我也无法控制整体架构,因此这成为维护问题。)