如何使用 Pyspark 将数组<string> 转换为数组<struct>?

How to convert array<string> to array<struct> using Pyspark?

我有一个 spark dataframe (df) 列 - name, id, project, start_date, status

聚合中使用to_json函数时,使payload的数据类型为array<string>。如何将 array<string> 转换为 array<struct<project:string, start_date:date, status: string>>?需要此转换才能从红移光谱访问。

df_gp= df.groupBy(F.col('name'),
                          F.col('id')).agg(F.collect_list(
                          F.to_json(F.struct(('project'),
                                             ('start_date'),
                                             ('status')))).alias("payload"))


我遵循了中给出的步骤, this documentation

import json
def parse_json(array_str):
    json_obj = json.loads(array_str)
    for item in json_obj: 
        yield (item["project"], item["start_date"],item["status"])

json_schema = ArrayType(StructType([StructField('project', StringType(), nullable=True)
, StructField('start_date', DateType(), nullable=True)
, StructField('status', StringType(), nullable=True)]))

udf_parse_json = F.udf(lambda str: parse_json(str), json_schema)
df_new = df_gp.select(df_gp.name, df_gp.id, udf_parse_json(df_gp.payload).alias("payload"))

#works and shows intended schema
df_new.schema

# the following fails
df_new.show(truncate = False)

它抛出错误:

TypeError: the JSON object must be str, bytes or bytearray, not 'generator'

我该如何解决这个问题?

您的聚合中不需要 to_json,没有它也能正常工作。

df.groupBy(F.col('name'),F.col('id')).agg(F.collect_list(
                          F.struct(('project'),
                                             ('start_date'),
                                             ('status'))).alias("payload")).printSchema()


#root
# |-- name: string (nullable = true)
# |-- id: long (nullable = true)
# |-- payload: array (nullable = true)
# |    |-- element: struct (containsNull = true)
# |    |    |-- project: string (nullable = true)
# |    |    |-- start_date: date (nullable = true)
# |    |    |-- status: string (nullable = true)