PySpark:如何创建 json 结构?
PySpark: How to create a json structure?
我正在尝试从以下结构创建一个 JSON。
示例数据:
Country|SegmentID|total_cnt|max_value|
+---------+---------+---------+---------+
| Pune| 1| 10.0| 15|
| Delhi| 1| 10.0| 15|
|Bangalore| 1| 10.0| 15|
| Pune| 2| 10.0| 16|
| Delhi| 2| 10.0| 16|
|Bangalore| 2| 10.0| 16|
| Pune| 3| 15.0| 16|
| Delhi| 3| 10.0| 16|
|Bangalore| 3| 15.0| 16|
+---------+---------+---------+---------+
这是我的代码:
预期JSON结构:
[{
"NAME": "SEG1",
"VAL": 15,
"CITIES": {
"Bangalore": 10,
"Delhi": 10,
"Pune": 10
}
},
{
"NAME": "SEG2",
"VAL": 16,
"CITIES": {
"Bangalore": 10,
"Delhi": 10,
"Pune": 10
}
},
{
"NAME": "SEG3",
"VAL": 16,
"CITIES": {
"Bangalore": 15,
"Delhi": 10,
"Pune": 15
}
}
]
我可以创建一级层次结构,但这也不能满足我的要求。
join_df=join_df.toPandas()
j = (join_df.groupby(['SegmentID','max_value'], as_index=False)
.apply(lambda x: x[['Country','total_cnt']].to_dict('r'))
.reset_index().rename(columns={0:'CITIES'})
.to_json(orient='records'))
结果如下:
[{"SegmentID":1,"max_value":15,"Cities":[{"Country":"Pune","total_cnt":10.0},{"Country":"Delhi","total_cnt":10.0},{"Country":"Bangalore","total_cnt":10.0}]},{"SegmentID":2,"max_value":16,"Cities":[{"Country":"Pune","total_cnt":10.0},{"Country":"Delhi","total_cnt":10.0},{"Country":"Bangalore","total_cnt":10.0}]},{"SegmentID":3,"max_value":16,"Cities":[{"Country":"Pune","total_cnt":15.0},{"Country":"Delhi","total_cnt":10.0},{"Country":"Bangalore","total_cnt":15.0}]}]
您可以将 Dataframe 转换为 RDD 并应用您的转换:
from pyspark.sql.types import *
import json
NewSchema = StructType([StructField("Name", StringType())
, StructField("VAL", IntegerType())
, StructField("CITIES", StringType())
])
def reduceKeys(row1, row2):
row1[0].update(row2[0])
return row1
res_df = join_df.rdd.map(lambda row: ("SEG" + str(row[1]), ({row[0]: row[2]}, row[3])))\
.reduceByKey(lambda x, y: reduceKeys(x, y))\
.map(lambda row: (row[0], row[1][1], json.dumps(row[1][0])))\
.toDF(NewSchema)
结果如下:
res_df.show(20, False)
+----+---+------------------------------------------------+
|Name|VAL|CITIES |
+----+---+------------------------------------------------+
|SEG1|15 |{"Pune": 10.0, "Delhi": 10.0, "Bangalore": 10.0}|
|SEG3|16 |{"Pune": 15.0, "Delhi": 10.0, "Bangalore": 15.0}|
|SEG2|16 |{"Pune": 10.0, "Delhi": 10.0, "Bangalore": 10.0}|
+----+---+------------------------------------------------+
现在您可以将其保存在 JSON 文件中:
res_df.coalesce(1).write.format('json').save('output.json')
我正在尝试从以下结构创建一个 JSON。
示例数据:
Country|SegmentID|total_cnt|max_value|
+---------+---------+---------+---------+
| Pune| 1| 10.0| 15|
| Delhi| 1| 10.0| 15|
|Bangalore| 1| 10.0| 15|
| Pune| 2| 10.0| 16|
| Delhi| 2| 10.0| 16|
|Bangalore| 2| 10.0| 16|
| Pune| 3| 15.0| 16|
| Delhi| 3| 10.0| 16|
|Bangalore| 3| 15.0| 16|
+---------+---------+---------+---------+
这是我的代码:
预期JSON结构:
[{
"NAME": "SEG1",
"VAL": 15,
"CITIES": {
"Bangalore": 10,
"Delhi": 10,
"Pune": 10
}
},
{
"NAME": "SEG2",
"VAL": 16,
"CITIES": {
"Bangalore": 10,
"Delhi": 10,
"Pune": 10
}
},
{
"NAME": "SEG3",
"VAL": 16,
"CITIES": {
"Bangalore": 15,
"Delhi": 10,
"Pune": 15
}
}
]
我可以创建一级层次结构,但这也不能满足我的要求。
join_df=join_df.toPandas()
j = (join_df.groupby(['SegmentID','max_value'], as_index=False)
.apply(lambda x: x[['Country','total_cnt']].to_dict('r'))
.reset_index().rename(columns={0:'CITIES'})
.to_json(orient='records'))
结果如下:
[{"SegmentID":1,"max_value":15,"Cities":[{"Country":"Pune","total_cnt":10.0},{"Country":"Delhi","total_cnt":10.0},{"Country":"Bangalore","total_cnt":10.0}]},{"SegmentID":2,"max_value":16,"Cities":[{"Country":"Pune","total_cnt":10.0},{"Country":"Delhi","total_cnt":10.0},{"Country":"Bangalore","total_cnt":10.0}]},{"SegmentID":3,"max_value":16,"Cities":[{"Country":"Pune","total_cnt":15.0},{"Country":"Delhi","total_cnt":10.0},{"Country":"Bangalore","total_cnt":15.0}]}]
您可以将 Dataframe 转换为 RDD 并应用您的转换:
from pyspark.sql.types import *
import json
NewSchema = StructType([StructField("Name", StringType())
, StructField("VAL", IntegerType())
, StructField("CITIES", StringType())
])
def reduceKeys(row1, row2):
row1[0].update(row2[0])
return row1
res_df = join_df.rdd.map(lambda row: ("SEG" + str(row[1]), ({row[0]: row[2]}, row[3])))\
.reduceByKey(lambda x, y: reduceKeys(x, y))\
.map(lambda row: (row[0], row[1][1], json.dumps(row[1][0])))\
.toDF(NewSchema)
结果如下:
res_df.show(20, False)
+----+---+------------------------------------------------+
|Name|VAL|CITIES |
+----+---+------------------------------------------------+
|SEG1|15 |{"Pune": 10.0, "Delhi": 10.0, "Bangalore": 10.0}|
|SEG3|16 |{"Pune": 15.0, "Delhi": 10.0, "Bangalore": 15.0}|
|SEG2|16 |{"Pune": 10.0, "Delhi": 10.0, "Bangalore": 10.0}|
+----+---+------------------------------------------------+
现在您可以将其保存在 JSON 文件中:
res_df.coalesce(1).write.format('json').save('output.json')