PySpark:如何从 spark 数据框创建嵌套的 JSON?
PySpark: How to create a nested JSON from spark data frame?
我正在尝试从我的 spark 数据帧创建一个嵌套的 json,它具有以下结构的数据。下面的代码创建了一个简单的 json 键和值。你能帮忙吗
df.coalesce(1).write.format('json').save(data_output_file+"createjson.json", overwrite=True)
更新1:
根据@MaxU 的回答,我将 spark 数据帧转换为 pandas 并使用了分组依据。它将最后两个字段放在嵌套数组中。我怎么能先把类别和计数放在嵌套数组中,然后在那个数组中我想把子类别和计数。
示例文本数据:
Vendor_Name,count,Categories,Category_Count,Subcategory,Subcategory_Count
Vendor1,10,Category 1,4,Sub Category 1,1
Vendor1,10,Category 1,4,Sub Category 2,2
Vendor1,10,Category 1,4,Sub Category 3,3
Vendor1,10,Category 1,4,Sub Category 4,4
j = (data_pd.groupby(['vendor_name','vendor_Cnt','Category','Category_cnt'], as_index=False)
.apply(lambda x: x[['Subcategory','subcategory_cnt']].to_dict('r'))
.reset_index()
.rename(columns={0:'subcategories'})
.to_json(orient='records'))
[{
"vendor_name": "Vendor 1",
"count": 10,
"categories": [{
"name": "Category 1",
"count": 4,
"subCategories": [{
"name": "Sub Category 1",
"count": 1
},
{
"name": "Sub Category 2",
"count": 1
},
{
"name": "Sub Category 3",
"count": 1
},
{
"name": "Sub Category 4",
"count": 1
}
]
}]
您需要为此重新构造整个数据框。
"subCategories" 是一个结构类型。
from pyspark.sql import functions as F
df.withColumn(
"subCategories",
F.struct(
F.col("subCategories").alias("name"),
F.col("subcategory_count").alias("count")
)
)
然后,groupBy 并使用 F.collect_list 创建数组。
最后,您的数据框中只需要有 1 条记录即可获得您期望的结果。
在 python/pandas 中执行此操作的最简单方法是使用一系列使用 groupby
的嵌套生成器,我认为:
def split_df(df):
for (vendor, count), df_vendor in df.groupby(["Vendor_Name", "count"]):
yield {
"vendor_name": vendor,
"count": count,
"categories": list(split_category(df_vendor))
}
def split_category(df_vendor):
for (category, count), df_category in df_vendor.groupby(
["Categories", "Category_Count"]
):
yield {
"name": category,
"count": count,
"subCategories": list(split_subcategory(df_category)),
}
def split_subcategory(df_category):
for row in df.itertuples():
yield {"name": row.Subcategory, "count": row.Subcategory_Count}
list(split_df(df))
[
{
"vendor_name": "Vendor1",
"count": 10,
"categories": [
{
"name": "Category 1",
"count": 4,
"subCategories": [
{"name": "Sub Category 1", "count": 1},
{"name": "Sub Category 2", "count": 2},
{"name": "Sub Category 3", "count": 3},
{"name": "Sub Category 4", "count": 4},
],
}
],
}
]
要将其导出到 json
,您需要一种方法来导出 np.int64
我正在尝试从我的 spark 数据帧创建一个嵌套的 json,它具有以下结构的数据。下面的代码创建了一个简单的 json 键和值。你能帮忙吗
df.coalesce(1).write.format('json').save(data_output_file+"createjson.json", overwrite=True)
更新1: 根据@MaxU 的回答,我将 spark 数据帧转换为 pandas 并使用了分组依据。它将最后两个字段放在嵌套数组中。我怎么能先把类别和计数放在嵌套数组中,然后在那个数组中我想把子类别和计数。
示例文本数据:
Vendor_Name,count,Categories,Category_Count,Subcategory,Subcategory_Count
Vendor1,10,Category 1,4,Sub Category 1,1
Vendor1,10,Category 1,4,Sub Category 2,2
Vendor1,10,Category 1,4,Sub Category 3,3
Vendor1,10,Category 1,4,Sub Category 4,4
j = (data_pd.groupby(['vendor_name','vendor_Cnt','Category','Category_cnt'], as_index=False)
.apply(lambda x: x[['Subcategory','subcategory_cnt']].to_dict('r'))
.reset_index()
.rename(columns={0:'subcategories'})
.to_json(orient='records'))
[{
"vendor_name": "Vendor 1",
"count": 10,
"categories": [{
"name": "Category 1",
"count": 4,
"subCategories": [{
"name": "Sub Category 1",
"count": 1
},
{
"name": "Sub Category 2",
"count": 1
},
{
"name": "Sub Category 3",
"count": 1
},
{
"name": "Sub Category 4",
"count": 1
}
]
}]
您需要为此重新构造整个数据框。
"subCategories" 是一个结构类型。
from pyspark.sql import functions as F
df.withColumn(
"subCategories",
F.struct(
F.col("subCategories").alias("name"),
F.col("subcategory_count").alias("count")
)
)
然后,groupBy 并使用 F.collect_list 创建数组。
最后,您的数据框中只需要有 1 条记录即可获得您期望的结果。
在 python/pandas 中执行此操作的最简单方法是使用一系列使用 groupby
的嵌套生成器,我认为:
def split_df(df):
for (vendor, count), df_vendor in df.groupby(["Vendor_Name", "count"]):
yield {
"vendor_name": vendor,
"count": count,
"categories": list(split_category(df_vendor))
}
def split_category(df_vendor):
for (category, count), df_category in df_vendor.groupby(
["Categories", "Category_Count"]
):
yield {
"name": category,
"count": count,
"subCategories": list(split_subcategory(df_category)),
}
def split_subcategory(df_category):
for row in df.itertuples():
yield {"name": row.Subcategory, "count": row.Subcategory_Count}
list(split_df(df))
[ { "vendor_name": "Vendor1", "count": 10, "categories": [ { "name": "Category 1", "count": 4, "subCategories": [ {"name": "Sub Category 1", "count": 1}, {"name": "Sub Category 2", "count": 2}, {"name": "Sub Category 3", "count": 3}, {"name": "Sub Category 4", "count": 4}, ], } ], } ]
要将其导出到 json
,您需要一种方法来导出 np.int64