在 PySpark 中转换面向值 JSON

Converting values-oriented JSON in PySpark

对于我正在处理的项目,我需要将 API 的 JSON 输出读取到 Spark DataFrame 中,以进一步处理到数据湖存储中。但是,JSON 不仅仅是我习惯使用的常规 JSON。我想将下面的 JSON 转换为 Spark DataFrame,以便它可以用于 Delta Lake 处理。有谁知道如何有效地将其转换为 DataFrame?

[
  [
    {
      "name": "Id", 
      "value": "1"
    }, 
    {
      "name": "Firstname", 
      "value": "Foo"
    }
  ],
  [
    {
      "name": "Id", 
      "value": "2"
    }, 
    {
      "name": "Firstname", 
      "value": "Foo"
    },
    {
      "name": "Lastname", 
      "value": "Bar"
    }
  ]
]

旁注:

首选输出是 PySpark DataFrame,如下所示:

我已经尝试读取 JSON 然后进一步处理成一个新的 DataFrame 但这似乎非常低效并且它也无法处理某些行中丢失的列。

text = [[{"name": "Id", "value": "1"}, {"name": "Firstname","value": "Foo"}],[{"name": "Id", "value": "2"}, {"name": "Firstname","value": "Foo"}]]
df = spark.createDataFrame(text)

for itemIndex, item in enumerate(df.collect()):
  print('New record')
  for columnIndex, column in enumerate(df.columns):
    print(item[columnIndex]['name'], ': ', item[columnIndex]['value'])
  print('\n')

您可以使用 spark.read.json 我建议先将您的 json 更改为另一种格式

[
  [
    {
       "Id": "1",
 
       "Firstname": "Foo"
    }
  ],
  [
    {
      "Id": "2",
     "Firstname": "Foo",
     "Lastname": "Bar"
    }
  ]
]



为此你可以使用下面的代码

import json
original_json=json.loads(json_string)

newjson=[]

for items in original_json:
    temp_dict={item['name']:item['value'] for item in items}
    newjson.append(temp_dict)

    
newjson=json.dumps(newjson)
print (newjson)

f=open('yourjsonfile.json','w')
f.write(newjson)
f.close()
df = spark.read.json("your_json_file.json")
df.printSchema()
df.show()

还要注意您的 json 是多行的,因此您可以使用

df=spark.read.option("multiline","true") \
      .json("your_json_file.json")

不确定您的 JSON 字段的顺序是否始终相同,但这是您的一个选项。请注意,我将您的 JSON 包裹在 两个方括号 中,以便使其成为 单列 .

from pyspark.sql import functions as F

text = [[data]] # note two square brackets
df = spark.createDataFrame(text)

(df
    .select(F.explode('_1').alias('data'))
    .select(
        F.col('data')[0]['value'].alias('Id'),
        F.col('data')[1]['value'].alias('firstname'),
        F.col('data')[2]['value'].alias('lastname'),
    )
    .show(10, False)
)

+---+---------+--------+
|id |firstname|lastname|
+---+---------+--------+
|1  |Foo      |null    |
|2  |Foo      |Bar     |
+---+---------+--------+