PySpark:如何从 spark 数据框创建嵌套的 JSON?

PySpark: How to create a nested JSON from spark data frame?

我正在尝试从我的 spark 数据帧创建一个嵌套的 json,它具有以下结构的数据。下面的代码创建了一个简单的 json 键和值。你能帮忙吗

df.coalesce(1).write.format('json').save(data_output_file+"createjson.json", overwrite=True)

更新1: 根据@MaxU 的回答,我将 spark 数据帧转换为 pandas 并使用了分组依据。它将最后两个字段放在嵌套数组中。我怎么能先把类别和计数放在嵌套数组中,然后在那个数组中我想把子类别和计数。

示例文本数据:

Vendor_Name,count,Categories,Category_Count,Subcategory,Subcategory_Count
Vendor1,10,Category 1,4,Sub Category 1,1
Vendor1,10,Category 1,4,Sub Category 2,2
Vendor1,10,Category 1,4,Sub Category 3,3
Vendor1,10,Category 1,4,Sub Category 4,4

j = (data_pd.groupby(['vendor_name','vendor_Cnt','Category','Category_cnt'], as_index=False)
             .apply(lambda x: x[['Subcategory','subcategory_cnt']].to_dict('r'))
             .reset_index()
             .rename(columns={0:'subcategories'})
             .to_json(orient='records'))

[{
        "vendor_name": "Vendor 1",
        "count": 10,
        "categories": [{
            "name": "Category 1",
            "count": 4,
            "subCategories": [{
                    "name": "Sub Category 1",
                    "count": 1
                },
                {
                    "name": "Sub Category 2",
                    "count": 1
                },
                {
                    "name": "Sub Category 3",
                    "count": 1
                },
                {
                    "name": "Sub Category 4",
                    "count": 1
                }
            ]
        }]

您需要为此重新构造整个数据框。

"subCategories" 是一个结构类型。

from pyspark.sql import functions as F
df.withColumn(
  "subCategories",
  F.struct(
    F.col("subCategories").alias("name"),
    F.col("subcategory_count").alias("count")
  )
)

然后,groupBy 并使用 F.collect_list 创建数组。

最后,您的数据框中只需要有 1 条记录即可获得您期望的结果。

在 python/pandas 中执行此操作的最简单方法是使用一系列使用 groupby 的嵌套生成器,我认为:

def split_df(df):
    for (vendor, count), df_vendor in df.groupby(["Vendor_Name", "count"]):
        yield {
            "vendor_name": vendor,
            "count": count,
            "categories": list(split_category(df_vendor))
        }

def split_category(df_vendor):
    for (category, count), df_category in df_vendor.groupby(
        ["Categories", "Category_Count"]
    ):
        yield {
            "name": category,
            "count": count,
            "subCategories": list(split_subcategory(df_category)),
        }

def split_subcategory(df_category):
    for row in df.itertuples():
        yield {"name": row.Subcategory, "count": row.Subcategory_Count}

list(split_df(df))
[
    {
        "vendor_name": "Vendor1",
        "count": 10,
        "categories": [
            {
                "name": "Category 1",
                "count": 4,
                "subCategories": [
                    {"name": "Sub Category 1", "count": 1},
                    {"name": "Sub Category 2", "count": 2},
                    {"name": "Sub Category 3", "count": 3},
                    {"name": "Sub Category 4", "count": 4},
                ],
            }
        ],
    }
]

要将其导出到 json,您需要一种方法来导出 np.int64