如何使用 Pyspark 将数组<string> 转换为数组<struct>?
How to convert array<string> to array<struct> using Pyspark?
我有一个 spark dataframe (df)
列 - name, id, project, start_date, status
聚合中使用to_json
函数时,使payload的数据类型为array<string>
。如何将 array<string>
转换为 array<struct<project:string, start_date:date, status: string>>
?需要此转换才能从红移光谱访问。
df_gp= df.groupBy(F.col('name'),
F.col('id')).agg(F.collect_list(
F.to_json(F.struct(('project'),
('start_date'),
('status')))).alias("payload"))
我遵循了中给出的步骤,
this documentation
import json
def parse_json(array_str):
json_obj = json.loads(array_str)
for item in json_obj:
yield (item["project"], item["start_date"],item["status"])
json_schema = ArrayType(StructType([StructField('project', StringType(), nullable=True)
, StructField('start_date', DateType(), nullable=True)
, StructField('status', StringType(), nullable=True)]))
udf_parse_json = F.udf(lambda str: parse_json(str), json_schema)
df_new = df_gp.select(df_gp.name, df_gp.id, udf_parse_json(df_gp.payload).alias("payload"))
#works and shows intended schema
df_new.schema
# the following fails
df_new.show(truncate = False)
它抛出错误:
TypeError: the JSON object must be str, bytes or bytearray, not 'generator'
我该如何解决这个问题?
您的聚合中不需要 to_json
,没有它也能正常工作。
df.groupBy(F.col('name'),F.col('id')).agg(F.collect_list(
F.struct(('project'),
('start_date'),
('status'))).alias("payload")).printSchema()
#root
# |-- name: string (nullable = true)
# |-- id: long (nullable = true)
# |-- payload: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- project: string (nullable = true)
# | | |-- start_date: date (nullable = true)
# | | |-- status: string (nullable = true)
我有一个 spark dataframe (df)
列 - name, id, project, start_date, status
聚合中使用to_json
函数时,使payload的数据类型为array<string>
。如何将 array<string>
转换为 array<struct<project:string, start_date:date, status: string>>
?需要此转换才能从红移光谱访问。
df_gp= df.groupBy(F.col('name'),
F.col('id')).agg(F.collect_list(
F.to_json(F.struct(('project'),
('start_date'),
('status')))).alias("payload"))
我遵循了中给出的步骤, this documentation
import json
def parse_json(array_str):
json_obj = json.loads(array_str)
for item in json_obj:
yield (item["project"], item["start_date"],item["status"])
json_schema = ArrayType(StructType([StructField('project', StringType(), nullable=True)
, StructField('start_date', DateType(), nullable=True)
, StructField('status', StringType(), nullable=True)]))
udf_parse_json = F.udf(lambda str: parse_json(str), json_schema)
df_new = df_gp.select(df_gp.name, df_gp.id, udf_parse_json(df_gp.payload).alias("payload"))
#works and shows intended schema
df_new.schema
# the following fails
df_new.show(truncate = False)
它抛出错误:
TypeError: the JSON object must be str, bytes or bytearray, not 'generator'
我该如何解决这个问题?
您的聚合中不需要 to_json
,没有它也能正常工作。
df.groupBy(F.col('name'),F.col('id')).agg(F.collect_list(
F.struct(('project'),
('start_date'),
('status'))).alias("payload")).printSchema()
#root
# |-- name: string (nullable = true)
# |-- id: long (nullable = true)
# |-- payload: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- project: string (nullable = true)
# | | |-- start_date: date (nullable = true)
# | | |-- status: string (nullable = true)