使用 pyspark 数据框将一列读取为 json 字符串,将另一列读取为常规
Read one column as json strings and another as regular using pyspark dataframe
我有这样一个数据框:
col1 | col2 |
-----------------------
test:1 | {"test1:subtest1":[{"Id":"17","cName":"c1"}], "test1:subtest2":[{"Id":"01","cName":"c2"}]}
test:2 | {"test1:subtest2":[{"Id":"18","cName":"c13","pScore":0.00203}]}
我想要这样的输出:
col1 | col2 | Id | cName | pScore |
------------------------------------------------
test:1 | test1:subtest1 | 17 | c1 | null |
test:1 | test1:subtest2 | 01 | c2 | null |
test:2 | test1:subtest2 | 18 | c13 | 0.00203 |
这是对这个问题的跟进 -
我是 pyspark 的新手,非常感谢这方面的任何帮助。我尝试了 post 中给出的解决方案。它一直给我错误
TypeError: type object argument after ** must be a mapping, not list
我还尝试了以下方法:
test = sqlContext.read.json(df.rdd.map(lambda r: r.col2))
但这给了我如下输出:
test1:subtest1 | test1:subtest2 |
----------------------------------------------
[{"Id":"17","cName":"c1"}] | [{"Id":"01","cName":"c2"}]
null | [{"Id":"18","cName":"c13","pScore":0.00203}]
我不知道如何使用上面的 ^ 加入 col1
并获得所需的输出。
非常感谢任何帮助,提前致谢!!
你可以使用from_json()函数,关键是定义json_schema你可以手动创建它或者如果你使用pyspark 2.4+,你可以使用函数schema_of_json()(下面的代码在pyspark 2.4.0下测试):
from pyspark.sql import functions as F
# define all keys with a list:
my_keys = ['test1:subtest1', 'test1:subtest2']
# find a sample json_code for a single key with all sub-fields and then construct its json_schema
key_schema = df.select(F.schema_of_json('{"test1:subtest1":[{"Id":"17","cName":"c1","pScore":0.00203}]}').alias('schema')).first().schema
>>> key_schema
u'struct<test1:subtest1:array<struct<Id:string,cName:string,pScore:double>>>'
# use the above sample key_schema to create the json_schema for all keys
schema = u'struct<' + ','.join([r'`{}`:array<struct<Id:string,cName:string,pScore:double>>'.format(k) for k in my_keys]) + r'>'
>>> schema
u'struct<`test1:subtest1`:array<struct<Id:string,cName:string,pScore:double>>,`test1:subtest2`:array<struct<Id:string,cName:string,pScore:double>>>'
注意: 当字段名包含特殊字符如 :
.[=18= 时,需要用反引号将其括起来]
有了架构后,可以从 col2
:
中检索 json 数据
df1 = df.withColumn('data', F.from_json('col2', schema)).select('col1', 'data.*')
>>> df1.printSchema()
root
|-- col1: string (nullable = true)
|-- test1:subtest1: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Id: string (nullable = true)
| | |-- cName: string (nullable = true)
| | |-- pScore: double (nullable = true)
|-- test1:subtest2: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Id: string (nullable = true)
| | |-- cName: string (nullable = true)
| | |-- pScore: double (nullable = true)
>>> df1.show(2,0)
+------+--------------+--------------------+
|col1 |test1:subtest1|test1:subtest2 |
+------+--------------+--------------------+
|test:1|[[17, c1,]] |[[01, c2,]] |
|test:2|null |[[18, c13, 0.00203]]|
+------+--------------+--------------------+
然后你可以使用 select 和 union 来归一化数据帧:
df_new = df1.select('col1', F.lit('test1:subtest1').alias('col2'), F.explode(F.col('test1:subtest1')).alias('arr')) \
.union(
df1.select('col1', F.lit('test1:subtest2'), F.explode(F.col('test1:subtest2')))
).select('col1', 'col2', 'arr.*')
>>> df_new.show()
+------+--------------+---+-----+-------+
| col1| col2| Id|cName| pScore|
+------+--------------+---+-----+-------+
|test:1|test1:subtest1| 17| c1| null|
|test:1|test1:subtest2| 01| c2| null|
|test:2|test1:subtest2| 18| c13|0.00203|
+------+--------------+---+-----+-------+
使用reduce()
当json字符串中唯一键较多时,使用reduce函数创建df_new
:
from functools import reduce
df_new = reduce(lambda x,y: x.union(y)
, [ df1.select('col1', F.lit(k).alias('col2'), F.explode(F.col(k)).alias('arr')) for k in my_keys ]
).select('col1', 'col2', 'arr.*')
我有这样一个数据框:
col1 | col2 |
-----------------------
test:1 | {"test1:subtest1":[{"Id":"17","cName":"c1"}], "test1:subtest2":[{"Id":"01","cName":"c2"}]}
test:2 | {"test1:subtest2":[{"Id":"18","cName":"c13","pScore":0.00203}]}
我想要这样的输出:
col1 | col2 | Id | cName | pScore |
------------------------------------------------
test:1 | test1:subtest1 | 17 | c1 | null |
test:1 | test1:subtest2 | 01 | c2 | null |
test:2 | test1:subtest2 | 18 | c13 | 0.00203 |
这是对这个问题的跟进 -
我是 pyspark 的新手,非常感谢这方面的任何帮助。我尝试了 post 中给出的解决方案。它一直给我错误
TypeError: type object argument after ** must be a mapping, not list
我还尝试了以下方法:
test = sqlContext.read.json(df.rdd.map(lambda r: r.col2))
但这给了我如下输出:
test1:subtest1 | test1:subtest2 |
----------------------------------------------
[{"Id":"17","cName":"c1"}] | [{"Id":"01","cName":"c2"}]
null | [{"Id":"18","cName":"c13","pScore":0.00203}]
我不知道如何使用上面的 ^ 加入 col1
并获得所需的输出。
非常感谢任何帮助,提前致谢!!
你可以使用from_json()函数,关键是定义json_schema你可以手动创建它或者如果你使用pyspark 2.4+,你可以使用函数schema_of_json()(下面的代码在pyspark 2.4.0下测试):
from pyspark.sql import functions as F
# define all keys with a list:
my_keys = ['test1:subtest1', 'test1:subtest2']
# find a sample json_code for a single key with all sub-fields and then construct its json_schema
key_schema = df.select(F.schema_of_json('{"test1:subtest1":[{"Id":"17","cName":"c1","pScore":0.00203}]}').alias('schema')).first().schema
>>> key_schema
u'struct<test1:subtest1:array<struct<Id:string,cName:string,pScore:double>>>'
# use the above sample key_schema to create the json_schema for all keys
schema = u'struct<' + ','.join([r'`{}`:array<struct<Id:string,cName:string,pScore:double>>'.format(k) for k in my_keys]) + r'>'
>>> schema
u'struct<`test1:subtest1`:array<struct<Id:string,cName:string,pScore:double>>,`test1:subtest2`:array<struct<Id:string,cName:string,pScore:double>>>'
注意: 当字段名包含特殊字符如 :
.[=18= 时,需要用反引号将其括起来]
有了架构后,可以从 col2
:
df1 = df.withColumn('data', F.from_json('col2', schema)).select('col1', 'data.*')
>>> df1.printSchema()
root
|-- col1: string (nullable = true)
|-- test1:subtest1: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Id: string (nullable = true)
| | |-- cName: string (nullable = true)
| | |-- pScore: double (nullable = true)
|-- test1:subtest2: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Id: string (nullable = true)
| | |-- cName: string (nullable = true)
| | |-- pScore: double (nullable = true)
>>> df1.show(2,0)
+------+--------------+--------------------+
|col1 |test1:subtest1|test1:subtest2 |
+------+--------------+--------------------+
|test:1|[[17, c1,]] |[[01, c2,]] |
|test:2|null |[[18, c13, 0.00203]]|
+------+--------------+--------------------+
然后你可以使用 select 和 union 来归一化数据帧:
df_new = df1.select('col1', F.lit('test1:subtest1').alias('col2'), F.explode(F.col('test1:subtest1')).alias('arr')) \
.union(
df1.select('col1', F.lit('test1:subtest2'), F.explode(F.col('test1:subtest2')))
).select('col1', 'col2', 'arr.*')
>>> df_new.show()
+------+--------------+---+-----+-------+
| col1| col2| Id|cName| pScore|
+------+--------------+---+-----+-------+
|test:1|test1:subtest1| 17| c1| null|
|test:1|test1:subtest2| 01| c2| null|
|test:2|test1:subtest2| 18| c13|0.00203|
+------+--------------+---+-----+-------+
使用reduce()
当json字符串中唯一键较多时,使用reduce函数创建df_new
:
from functools import reduce
df_new = reduce(lambda x,y: x.union(y)
, [ df1.select('col1', F.lit(k).alias('col2'), F.explode(F.col(k)).alias('arr')) for k in my_keys ]
).select('col1', 'col2', 'arr.*')