混合架构数据类型 JSON 到 PySpark DataFrame

Mixed schema datatype JSON to PySpark DataFrame

我需要将 JSON 的列表转换为 pySpark DataFrame。 JSON 都具有相同的架构。问题是 JSON 中字典的 value-entries 不同的数据类型 .

示例:字段 complex 是一个字典数组,字典有四个键,但类型不同(整数、字符串、浮点数和嵌套字典)。请参阅下面的示例 JSON.

如果我使用 df = spark.createDataFrame(json_list) 从 json 创建我的 DataFrame,pyspark 会“删除”一些数据,因为他无法正确推断架构。 PySpark 决定 complex 字段的模式应该是:StructType("complex", ArrayType(MapType(StringType(), LongType()))) 这会导致非 LongType 值被清空。

我试图提供一个模式,但因为我需要为嵌套 MapType 的值字段设置一个 specific (?) DataType - 这不是统一的,而是变化的...

myschema = StructType([
                             StructField("Id", StringType(), True),
                             StructField("name", StringType(), True),
                             StructField("sentTimestamp", LongType(), True),
                             StructType("complex", ArrayType(MapType(StringType(), StringType())))
                             ])

MapType(StringType(), StringType()))) 表示字典中的某些值字段由于无法映射而被清空。

PySpark 似乎只有在值的所有数据类型都相同时才能处理字典。

如何在不丢失数据的情况下将 JSON 转换为 pyspark DataFrame?

[{
    "Id": "2345123",
    "name": "something",        
    "sentTimestamp": 1646732402,
    "complex":
    [
        {
            "key1": 1,
            "key2": "(1)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "random",
                "innerkey2": 5.4,
                "innerkey3": 1
            }
        },
        {
            "key1": 2,
            "key2": "(2)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "left",
                "innerkey2": 7.8,
                "innerkey3": 1
            }
        }
    ]
}]

您可以将 complex 列的架构指定为结构数组。

myschema = StructType(
    [
        StructField("Id", StringType(), True),
        StructField("name", StringType(), True),
        StructField("sentTimestamp", LongType(), True),
        StructField(
            "complex",
            ArrayType(StructType(
                [
                    StructField("key1", LongType(), True),
                    StructField("key2", StringType(), True),
                    StructField("key3", StringType(), True),
                    StructField(
                        "key4",
                        StructType(
                            [
                                StructField("innerkey1", StringType(), True),
                                StructField("innerkey2", StringType(), True),
                                StructField("innerkey3", IntegerType(), True),
                            ]
                        )
                    )
                ]
            ))
        )
    ]
)

添加到@过过招的回答中,下面是我个人会使用的方法,因为它在定义 dataframe 模式时涉及较少的代码。

输入JSON

jsonstr = """[{
    "Id": "2345123",
    "name": "something",        
    "sentTimestamp": 1646732402,
    "complex":
    [
        {
            "key1": 1,
            "key2": "(1)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "random",
                "innerkey2": 5.4,
                "innerkey3": 1
            }
        },
        {
            "key1": 2,
            "key2": "(2)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "left",
                "innerkey2": 7.8,
                "innerkey3": 1
            }
        }
    ]
}]"""

将其转换为 RDD -

import json

rdd = sc.parallelize(json.loads(jsonstr))

正在创建 dataframe -

df=spark.createDataFrame(rdd, 'Id string, name string, sentTimestamp long, complex array<struct<key1:int, key2:string, key3:float, key4:struct<innerkey1:string,innerkey2:float,innerkey3:int>>>')
df.show(truncate=False)

#Output Data
+-------+---------+-------------+----------------------------------------------------------------+
|Id     |name     |sentTimestamp|complex                                                         |
+-------+---------+-------------+----------------------------------------------------------------+
|2345123|something|1646732402   |[{1, (1), 0.5, {random, 5.4, 1}}, {2, (2), 0.5, {left, 7.8, 1}}]|
+-------+---------+-------------+----------------------------------------------------------------+

#Output Schema
root
 |-- Id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- sentTimestamp: long (nullable = true)
 |-- complex: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key1: integer (nullable = true)
 |    |    |-- key2: string (nullable = true)
 |    |    |-- key3: float (nullable = true)
 |    |    |-- key4: struct (nullable = true)
 |    |    |    |-- innerkey1: string (nullable = true)
 |    |    |    |-- innerkey2: float (nullable = true)
 |    |    |    |-- innerkey3: integer (nullable = true)

如果您不想传递模式或希望 spark 从 3.0+ 检测模式,您可以将 json 写入 table

%sql

CREATE TABLE newtable AS SELECT
'{
    "Id": "2345123",
    "name": "something",        
    "sentTimestamp": 1646732402,
    "complex":
    [
        {
            "key1": 1,
            "key2": "(1)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "random",
                "innerkey2": 5.4,
                "innerkey3": 1
            }
        },
        {
            "key1": 2,
            "key2": "(2)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "left",
                "innerkey2": 7.8,
                "innerkey3": 1
            }
        }
    ]
}'as original

将 table 转换为数据帧

df1 =spark.sql('select * from newtable')

rdd table

中的单列
rdd=df1.select(col("original").alias("jsoncol")).rdd.map(lambda x: x.jsoncol)

利用 .read 读取 rdd 模式并设置变量

newschema=spark.read.json(rdd).schema

使用 select

将架构分配给列
df3=df1.select("*",from_json("original", newschema).alias("transrequest"))

df3.select('transrequest.*').show(truncate=False)

+-------+----------------------------------------------------------------+---------+-------------+
|Id     |complex                                                         |name     |sentTimestamp|
+-------+----------------------------------------------------------------+---------+-------------+
|2345123|[{1, (1), 0.5, {random, 5.4, 1}}, {2, (2), 0.5, {left, 7.8, 1}}]|something|1646732402   |
+-------+----------------------------------------------------------------+---------+-------------+

架构

root
 |-- Id: string (nullable = true)
 |-- complex: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key1: long (nullable = true)
 |    |    |-- key2: string (nullable = true)
 |    |    |-- key3: double (nullable = true)
 |    |    |-- key4: struct (nullable = true)
 |    |    |    |-- innerkey1: string (nullable = true)
 |    |    |    |-- innerkey2: double (nullable = true)
 |    |    |    |-- innerkey3: long (nullable = true)
 |-- name: string (nullable = true)
 |-- sentTimestamp: long (nullable = true)