Pyspark:从涉及数组列的 Json 模式创建模式
Pyspark: Create Schema from Json Schema involving Array columns
我在 json 文件中为 df 定义了如下模式:
{
"table1":{
"fields":[
{"metadata":{}, "name":"first_name", "type":"string", "nullable":false},
{"metadata":{}, "name":"last_name", "type":"string", "nullable":false},
{"metadata":{}, "name":"subjects", "type":"array","items":{"type":["string", "string"]}, "nullable":false},
{"metadata":{}, "name":"marks", "type":"array","items":{"type":["integer", "integer"]}, "nullable":false},
{"metadata":{}, "name":"dept", "type":"string", "nullable":false}
]
}
}
EG JSON 数据:
{
"table1": [
{
"first_name":"john",
"last_name":"doe",
"subjects":["maths","science"],
"marks":[90,67],
"dept":"abc"
},
{
"first_name":"dan",
"last_name":"steyn",
"subjects":["maths","science"],
"marks":[90,67],
"dept":"abc"
},
{
"first_name":"rose",
"last_name":"wayne",
"subjects":["maths","science"],
"marks":[90,67],
"dept":"abc"
},
{
"first_name":"nat",
"last_name":"lee",
"subjects":["maths","science"],
"marks":[90,67],
"dept":"abc"
},
{
"first_name":"jim",
"last_name":"lim",
"subjects":["maths","science"],
"marks":[90,67],
"dept":"abc"
}
]
}
我想从这个 json 文件创建等效的 spark 模式。下面是我的代码:(reference: )
with open(schemaFile) as s:
schema = json.load(s)["table1"]
source_schema = StructType.fromJson(schema)
如果我没有任何数组列,上面的代码可以正常工作。但是如果我的架构中有数组列,则会抛出以下错误。
"Could not parse datatype: array"
("Could not parse datatype: %s" json_value)
在您的案例中,数组的表示存在问题。正确的语法是:
{ "metadata": {},
"name": "marks",
"nullable": true, "type": {"containsNull": true, "elementType": "long", "type": "array" } }
.
为了从 json 中检索架构,您可以编写下一个 pyspark 片段:
jsonData = """{
"table1": [{
"first_name": "john",
"last_name": "doe",
"subjects": ["maths", "science"],
"marks": [90, 67],
"dept": "abc"
},
{
"first_name": "dan",
"last_name": "steyn",
"subjects": ["maths", "science"],
"marks": [90, 67],
"dept": "abc"
},
{
"first_name": "rose",
"last_name": "wayne",
"subjects": ["maths", "science"],
"marks": [90, 67],
"dept": "abc"
},
{
"first_name": "nat",
"last_name": "lee",
"subjects": ["maths", "science"],
"marks": [90, 67],
"dept": "abc"
},
{
"first_name": "jim",
"last_name": "lim",
"subjects": ["maths", "science"],
"marks": [90, 67],
"dept": "abc"
}
]
}"""
df = spark.read.json(sc.parallelize([jsonData]))
df.schema.json()
这应该输出:
{
"fields": [{
"metadata": {},
"name": "table1",
"nullable": true,
"type": {
"containsNull": true,
"elementType": {
"fields": [{
"metadata": {},
"name": "dept",
"nullable": true,
"type": "string"
}, {
"metadata": {},
"name": "first_name",
"nullable": true,
"type": "string"
}, {
"metadata": {},
"name": "last_name",
"nullable": true,
"type": "string"
}, {
"metadata": {},
"name": "marks",
"nullable": true,
"type": {
"containsNull": true,
"elementType": "long",
"type": "array"
}
}, {
"metadata": {},
"name": "subjects",
"nullable": true,
"type": {
"containsNull": true,
"elementType": "string",
"type": "array"
}
}],
"type": "struct"
},
"type": "array"
}
}],
"type": "struct"
}
或者,您可以使用 df.schema.simpleString()
这将 return 一种相对简单的模式格式:
struct<table1:array<struct<dept:string,first_name:string,last_name:string,marks:array<bigint>,subjects:array<string>>>>
最后,您可以将上面的模式存储到一个文件中,稍后加载它:
import json
new_schema = StructType.fromJson(json.loads(schema_json))
正如你所做的那样。 请记住,对于任何 json 数据,您也可以动态地实现所描述的过程。
我在 json 文件中为 df 定义了如下模式:
{
"table1":{
"fields":[
{"metadata":{}, "name":"first_name", "type":"string", "nullable":false},
{"metadata":{}, "name":"last_name", "type":"string", "nullable":false},
{"metadata":{}, "name":"subjects", "type":"array","items":{"type":["string", "string"]}, "nullable":false},
{"metadata":{}, "name":"marks", "type":"array","items":{"type":["integer", "integer"]}, "nullable":false},
{"metadata":{}, "name":"dept", "type":"string", "nullable":false}
]
}
}
EG JSON 数据:
{
"table1": [
{
"first_name":"john",
"last_name":"doe",
"subjects":["maths","science"],
"marks":[90,67],
"dept":"abc"
},
{
"first_name":"dan",
"last_name":"steyn",
"subjects":["maths","science"],
"marks":[90,67],
"dept":"abc"
},
{
"first_name":"rose",
"last_name":"wayne",
"subjects":["maths","science"],
"marks":[90,67],
"dept":"abc"
},
{
"first_name":"nat",
"last_name":"lee",
"subjects":["maths","science"],
"marks":[90,67],
"dept":"abc"
},
{
"first_name":"jim",
"last_name":"lim",
"subjects":["maths","science"],
"marks":[90,67],
"dept":"abc"
}
]
}
我想从这个 json 文件创建等效的 spark 模式。下面是我的代码:(reference:
with open(schemaFile) as s:
schema = json.load(s)["table1"]
source_schema = StructType.fromJson(schema)
如果我没有任何数组列,上面的代码可以正常工作。但是如果我的架构中有数组列,则会抛出以下错误。
"Could not parse datatype: array" ("Could not parse datatype: %s" json_value)
在您的案例中,数组的表示存在问题。正确的语法是:
{ "metadata": {},
"name": "marks",
"nullable": true, "type": {"containsNull": true, "elementType": "long", "type": "array" } }
.
为了从 json 中检索架构,您可以编写下一个 pyspark 片段:
jsonData = """{
"table1": [{
"first_name": "john",
"last_name": "doe",
"subjects": ["maths", "science"],
"marks": [90, 67],
"dept": "abc"
},
{
"first_name": "dan",
"last_name": "steyn",
"subjects": ["maths", "science"],
"marks": [90, 67],
"dept": "abc"
},
{
"first_name": "rose",
"last_name": "wayne",
"subjects": ["maths", "science"],
"marks": [90, 67],
"dept": "abc"
},
{
"first_name": "nat",
"last_name": "lee",
"subjects": ["maths", "science"],
"marks": [90, 67],
"dept": "abc"
},
{
"first_name": "jim",
"last_name": "lim",
"subjects": ["maths", "science"],
"marks": [90, 67],
"dept": "abc"
}
]
}"""
df = spark.read.json(sc.parallelize([jsonData]))
df.schema.json()
这应该输出:
{
"fields": [{
"metadata": {},
"name": "table1",
"nullable": true,
"type": {
"containsNull": true,
"elementType": {
"fields": [{
"metadata": {},
"name": "dept",
"nullable": true,
"type": "string"
}, {
"metadata": {},
"name": "first_name",
"nullable": true,
"type": "string"
}, {
"metadata": {},
"name": "last_name",
"nullable": true,
"type": "string"
}, {
"metadata": {},
"name": "marks",
"nullable": true,
"type": {
"containsNull": true,
"elementType": "long",
"type": "array"
}
}, {
"metadata": {},
"name": "subjects",
"nullable": true,
"type": {
"containsNull": true,
"elementType": "string",
"type": "array"
}
}],
"type": "struct"
},
"type": "array"
}
}],
"type": "struct"
}
或者,您可以使用 df.schema.simpleString()
这将 return 一种相对简单的模式格式:
struct<table1:array<struct<dept:string,first_name:string,last_name:string,marks:array<bigint>,subjects:array<string>>>>
最后,您可以将上面的模式存储到一个文件中,稍后加载它:
import json
new_schema = StructType.fromJson(json.loads(schema_json))
正如你所做的那样。 请记住,对于任何 json 数据,您也可以动态地实现所描述的过程。