混合架构数据类型 JSON 到 PySpark DataFrame
Mixed schema datatype JSON to PySpark DataFrame
我需要将 JSON 的列表转换为 pySpark DataFrame。 JSON 都具有相同的架构。问题是 JSON 中字典的 value-entries 有 不同的数据类型 .
示例:字段 complex
是一个字典数组,字典有四个键,但类型不同(整数、字符串、浮点数和嵌套字典)。请参阅下面的示例 JSON.
如果我使用 df = spark.createDataFrame(json_list)
从 json 创建我的 DataFrame,pyspark 会“删除”一些数据,因为他无法正确推断架构。 PySpark 决定
complex
字段的模式应该是:StructType("complex", ArrayType(MapType(StringType(), LongType())))
这会导致非 LongType 值被清空。
我试图提供一个模式,但因为我需要为嵌套 MapType 的值字段设置一个 specific (?) DataType - 这不是统一的,而是变化的...
myschema = StructType([
StructField("Id", StringType(), True),
StructField("name", StringType(), True),
StructField("sentTimestamp", LongType(), True),
StructType("complex", ArrayType(MapType(StringType(), StringType())))
])
MapType(StringType(), StringType())))
表示字典中的某些值字段由于无法映射而被清空。
PySpark 似乎只有在值的所有数据类型都相同时才能处理字典。
如何在不丢失数据的情况下将 JSON 转换为 pyspark DataFrame?
[{
"Id": "2345123",
"name": "something",
"sentTimestamp": 1646732402,
"complex":
[
{
"key1": 1,
"key2": "(1)",
"key3": 0.5,
"key4":
{
"innerkey1": "random",
"innerkey2": 5.4,
"innerkey3": 1
}
},
{
"key1": 2,
"key2": "(2)",
"key3": 0.5,
"key4":
{
"innerkey1": "left",
"innerkey2": 7.8,
"innerkey3": 1
}
}
]
}]
您可以将 complex
列的架构指定为结构数组。
myschema = StructType(
[
StructField("Id", StringType(), True),
StructField("name", StringType(), True),
StructField("sentTimestamp", LongType(), True),
StructField(
"complex",
ArrayType(StructType(
[
StructField("key1", LongType(), True),
StructField("key2", StringType(), True),
StructField("key3", StringType(), True),
StructField(
"key4",
StructType(
[
StructField("innerkey1", StringType(), True),
StructField("innerkey2", StringType(), True),
StructField("innerkey3", IntegerType(), True),
]
)
)
]
))
)
]
)
添加到@过过招的回答中,下面是我个人会使用的方法,因为它在定义 dataframe
模式时涉及较少的代码。
输入JSON
jsonstr = """[{
"Id": "2345123",
"name": "something",
"sentTimestamp": 1646732402,
"complex":
[
{
"key1": 1,
"key2": "(1)",
"key3": 0.5,
"key4":
{
"innerkey1": "random",
"innerkey2": 5.4,
"innerkey3": 1
}
},
{
"key1": 2,
"key2": "(2)",
"key3": 0.5,
"key4":
{
"innerkey1": "left",
"innerkey2": 7.8,
"innerkey3": 1
}
}
]
}]"""
将其转换为 RDD
-
import json
rdd = sc.parallelize(json.loads(jsonstr))
正在创建 dataframe
-
df=spark.createDataFrame(rdd, 'Id string, name string, sentTimestamp long, complex array<struct<key1:int, key2:string, key3:float, key4:struct<innerkey1:string,innerkey2:float,innerkey3:int>>>')
df.show(truncate=False)
#Output Data
+-------+---------+-------------+----------------------------------------------------------------+
|Id |name |sentTimestamp|complex |
+-------+---------+-------------+----------------------------------------------------------------+
|2345123|something|1646732402 |[{1, (1), 0.5, {random, 5.4, 1}}, {2, (2), 0.5, {left, 7.8, 1}}]|
+-------+---------+-------------+----------------------------------------------------------------+
#Output Schema
root
|-- Id: string (nullable = true)
|-- name: string (nullable = true)
|-- sentTimestamp: long (nullable = true)
|-- complex: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key1: integer (nullable = true)
| | |-- key2: string (nullable = true)
| | |-- key3: float (nullable = true)
| | |-- key4: struct (nullable = true)
| | | |-- innerkey1: string (nullable = true)
| | | |-- innerkey2: float (nullable = true)
| | | |-- innerkey3: integer (nullable = true)
如果您不想传递模式或希望 spark 从 3.0+ 检测模式,您可以将 json 写入 table
%sql
CREATE TABLE newtable AS SELECT
'{
"Id": "2345123",
"name": "something",
"sentTimestamp": 1646732402,
"complex":
[
{
"key1": 1,
"key2": "(1)",
"key3": 0.5,
"key4":
{
"innerkey1": "random",
"innerkey2": 5.4,
"innerkey3": 1
}
},
{
"key1": 2,
"key2": "(2)",
"key3": 0.5,
"key4":
{
"innerkey1": "left",
"innerkey2": 7.8,
"innerkey3": 1
}
}
]
}'as original
将 table 转换为数据帧
df1 =spark.sql('select * from newtable')
rdd
table
中的单列
rdd=df1.select(col("original").alias("jsoncol")).rdd.map(lambda x: x.jsoncol)
利用 .read
读取 rdd
模式并设置变量
newschema=spark.read.json(rdd).schema
使用 select
将架构分配给列
df3=df1.select("*",from_json("original", newschema).alias("transrequest"))
df3.select('transrequest.*').show(truncate=False)
+-------+----------------------------------------------------------------+---------+-------------+
|Id |complex |name |sentTimestamp|
+-------+----------------------------------------------------------------+---------+-------------+
|2345123|[{1, (1), 0.5, {random, 5.4, 1}}, {2, (2), 0.5, {left, 7.8, 1}}]|something|1646732402 |
+-------+----------------------------------------------------------------+---------+-------------+
架构
root
|-- Id: string (nullable = true)
|-- complex: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key1: long (nullable = true)
| | |-- key2: string (nullable = true)
| | |-- key3: double (nullable = true)
| | |-- key4: struct (nullable = true)
| | | |-- innerkey1: string (nullable = true)
| | | |-- innerkey2: double (nullable = true)
| | | |-- innerkey3: long (nullable = true)
|-- name: string (nullable = true)
|-- sentTimestamp: long (nullable = true)
我需要将 JSON 的列表转换为 pySpark DataFrame。 JSON 都具有相同的架构。问题是 JSON 中字典的 value-entries 有 不同的数据类型 .
示例:字段 complex
是一个字典数组,字典有四个键,但类型不同(整数、字符串、浮点数和嵌套字典)。请参阅下面的示例 JSON.
如果我使用 df = spark.createDataFrame(json_list)
从 json 创建我的 DataFrame,pyspark 会“删除”一些数据,因为他无法正确推断架构。 PySpark 决定
complex
字段的模式应该是:StructType("complex", ArrayType(MapType(StringType(), LongType())))
这会导致非 LongType 值被清空。
我试图提供一个模式,但因为我需要为嵌套 MapType 的值字段设置一个 specific (?) DataType - 这不是统一的,而是变化的...
myschema = StructType([
StructField("Id", StringType(), True),
StructField("name", StringType(), True),
StructField("sentTimestamp", LongType(), True),
StructType("complex", ArrayType(MapType(StringType(), StringType())))
])
MapType(StringType(), StringType())))
表示字典中的某些值字段由于无法映射而被清空。
PySpark 似乎只有在值的所有数据类型都相同时才能处理字典。
如何在不丢失数据的情况下将 JSON 转换为 pyspark DataFrame?
[{
"Id": "2345123",
"name": "something",
"sentTimestamp": 1646732402,
"complex":
[
{
"key1": 1,
"key2": "(1)",
"key3": 0.5,
"key4":
{
"innerkey1": "random",
"innerkey2": 5.4,
"innerkey3": 1
}
},
{
"key1": 2,
"key2": "(2)",
"key3": 0.5,
"key4":
{
"innerkey1": "left",
"innerkey2": 7.8,
"innerkey3": 1
}
}
]
}]
您可以将 complex
列的架构指定为结构数组。
myschema = StructType(
[
StructField("Id", StringType(), True),
StructField("name", StringType(), True),
StructField("sentTimestamp", LongType(), True),
StructField(
"complex",
ArrayType(StructType(
[
StructField("key1", LongType(), True),
StructField("key2", StringType(), True),
StructField("key3", StringType(), True),
StructField(
"key4",
StructType(
[
StructField("innerkey1", StringType(), True),
StructField("innerkey2", StringType(), True),
StructField("innerkey3", IntegerType(), True),
]
)
)
]
))
)
]
)
添加到@过过招的回答中,下面是我个人会使用的方法,因为它在定义 dataframe
模式时涉及较少的代码。
输入JSON
jsonstr = """[{
"Id": "2345123",
"name": "something",
"sentTimestamp": 1646732402,
"complex":
[
{
"key1": 1,
"key2": "(1)",
"key3": 0.5,
"key4":
{
"innerkey1": "random",
"innerkey2": 5.4,
"innerkey3": 1
}
},
{
"key1": 2,
"key2": "(2)",
"key3": 0.5,
"key4":
{
"innerkey1": "left",
"innerkey2": 7.8,
"innerkey3": 1
}
}
]
}]"""
将其转换为 RDD
-
import json
rdd = sc.parallelize(json.loads(jsonstr))
正在创建 dataframe
-
df=spark.createDataFrame(rdd, 'Id string, name string, sentTimestamp long, complex array<struct<key1:int, key2:string, key3:float, key4:struct<innerkey1:string,innerkey2:float,innerkey3:int>>>')
df.show(truncate=False)
#Output Data
+-------+---------+-------------+----------------------------------------------------------------+
|Id |name |sentTimestamp|complex |
+-------+---------+-------------+----------------------------------------------------------------+
|2345123|something|1646732402 |[{1, (1), 0.5, {random, 5.4, 1}}, {2, (2), 0.5, {left, 7.8, 1}}]|
+-------+---------+-------------+----------------------------------------------------------------+
#Output Schema
root
|-- Id: string (nullable = true)
|-- name: string (nullable = true)
|-- sentTimestamp: long (nullable = true)
|-- complex: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key1: integer (nullable = true)
| | |-- key2: string (nullable = true)
| | |-- key3: float (nullable = true)
| | |-- key4: struct (nullable = true)
| | | |-- innerkey1: string (nullable = true)
| | | |-- innerkey2: float (nullable = true)
| | | |-- innerkey3: integer (nullable = true)
如果您不想传递模式或希望 spark 从 3.0+ 检测模式,您可以将 json 写入 table
%sql
CREATE TABLE newtable AS SELECT
'{
"Id": "2345123",
"name": "something",
"sentTimestamp": 1646732402,
"complex":
[
{
"key1": 1,
"key2": "(1)",
"key3": 0.5,
"key4":
{
"innerkey1": "random",
"innerkey2": 5.4,
"innerkey3": 1
}
},
{
"key1": 2,
"key2": "(2)",
"key3": 0.5,
"key4":
{
"innerkey1": "left",
"innerkey2": 7.8,
"innerkey3": 1
}
}
]
}'as original
将 table 转换为数据帧
df1 =spark.sql('select * from newtable')
rdd
table
rdd=df1.select(col("original").alias("jsoncol")).rdd.map(lambda x: x.jsoncol)
利用 .read
读取 rdd
模式并设置变量
newschema=spark.read.json(rdd).schema
使用 select
将架构分配给列df3=df1.select("*",from_json("original", newschema).alias("transrequest"))
df3.select('transrequest.*').show(truncate=False)
+-------+----------------------------------------------------------------+---------+-------------+
|Id |complex |name |sentTimestamp|
+-------+----------------------------------------------------------------+---------+-------------+
|2345123|[{1, (1), 0.5, {random, 5.4, 1}}, {2, (2), 0.5, {left, 7.8, 1}}]|something|1646732402 |
+-------+----------------------------------------------------------------+---------+-------------+
架构
root
|-- Id: string (nullable = true)
|-- complex: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key1: long (nullable = true)
| | |-- key2: string (nullable = true)
| | |-- key3: double (nullable = true)
| | |-- key4: struct (nullable = true)
| | | |-- innerkey1: string (nullable = true)
| | | |-- innerkey2: double (nullable = true)
| | | |-- innerkey3: long (nullable = true)
|-- name: string (nullable = true)
|-- sentTimestamp: long (nullable = true)