PySpark:如何创建 json 结构?

PySpark: How to create a json structure?

我正在尝试从以下结构创建一个 JSON。

示例数据:

Country|SegmentID|total_cnt|max_value|
+---------+---------+---------+---------+
|     Pune|        1|     10.0|       15|
|    Delhi|        1|     10.0|       15|
|Bangalore|        1|     10.0|       15|
|     Pune|        2|     10.0|       16|
|    Delhi|        2|     10.0|       16|
|Bangalore|        2|     10.0|       16|
|     Pune|        3|     15.0|       16|
|    Delhi|        3|     10.0|       16|
|Bangalore|        3|     15.0|       16|
+---------+---------+---------+---------+

这是我的代码:

预期JSON结构:

[{
        "NAME": "SEG1",
        "VAL": 15,
        "CITIES": {
            "Bangalore": 10,
            "Delhi": 10,
            "Pune": 10
        }
    },
    {
        "NAME": "SEG2",
        "VAL": 16,
        "CITIES": {
            "Bangalore": 10,
            "Delhi": 10,
            "Pune": 10
        }
    },
    {
        "NAME": "SEG3",
        "VAL": 16,
        "CITIES": {
            "Bangalore": 15,
            "Delhi": 10,
            "Pune": 15
        }
    }
]

我可以创建一级层次结构,但这也不能满足我的要求。

join_df=join_df.toPandas()
j = (join_df.groupby(['SegmentID','max_value'], as_index=False)
                .apply(lambda x: x[['Country','total_cnt']].to_dict('r'))
                .reset_index().rename(columns={0:'CITIES'})
                .to_json(orient='records'))

结果如下:

[{"SegmentID":1,"max_value":15,"Cities":[{"Country":"Pune","total_cnt":10.0},{"Country":"Delhi","total_cnt":10.0},{"Country":"Bangalore","total_cnt":10.0}]},{"SegmentID":2,"max_value":16,"Cities":[{"Country":"Pune","total_cnt":10.0},{"Country":"Delhi","total_cnt":10.0},{"Country":"Bangalore","total_cnt":10.0}]},{"SegmentID":3,"max_value":16,"Cities":[{"Country":"Pune","total_cnt":15.0},{"Country":"Delhi","total_cnt":10.0},{"Country":"Bangalore","total_cnt":15.0}]}]

您可以将 Dataframe 转换为 RDD 并应用您的转换:

from pyspark.sql.types import *
import json

NewSchema = StructType([StructField("Name", StringType())
                           , StructField("VAL", IntegerType())
                           , StructField("CITIES", StringType())
                        ])

def reduceKeys(row1, row2):
        row1[0].update(row2[0])
        return row1

res_df = join_df.rdd.map(lambda row: ("SEG" + str(row[1]), ({row[0]: row[2]}, row[3])))\
    .reduceByKey(lambda x, y: reduceKeys(x, y))\
    .map(lambda row: (row[0], row[1][1], json.dumps(row[1][0])))\
    .toDF(NewSchema)

结果如下:

res_df.show(20, False)


+----+---+------------------------------------------------+
|Name|VAL|CITIES                                          |
+----+---+------------------------------------------------+
|SEG1|15 |{"Pune": 10.0, "Delhi": 10.0, "Bangalore": 10.0}|
|SEG3|16 |{"Pune": 15.0, "Delhi": 10.0, "Bangalore": 15.0}|
|SEG2|16 |{"Pune": 10.0, "Delhi": 10.0, "Bangalore": 10.0}|
+----+---+------------------------------------------------+

现在您可以将其保存在 JSON 文件中:

res_df.coalesce(1).write.format('json').save('output.json')