PySpark:有没有办法将地图类型转换为结构?

PySpark: Is there a way to convert map type to struct?

我使用 rdd.map 从这样的列中提取和解码 json:

def process_data(row):
    encoded_data = json.loads(row["value"])
    base64_bytes = encoded_data["payload"].encode('ascii')
    ecoded_data_bytes = base64.b64decode(base64_bytes)
    data = json.loads(ecoded_data_bytes.decode('ascii'), strict=False)
    return data, row["file_name"], row["load_time"]

df = df.rdd.map(process_data).toDF

我得到了一个映射类型的数据列,但我想要它作为一个结构,我可以这样做吗?

我正在处理的一行数据如下所示:

{“value” = <encoded data>, “file_name”=“a”, “load_time”=1/1/1}

编码后的数据(值)如下所示:

{“payload”=[
  {
    “key_1”={
     “key_2”=val_2, 
     “key_3”=val_3
    }
  }, {
    “key_1”={
     “key_2”=val_2, 
     “key_3”=val_3
  }}, 
}]}

为了避免这个问题,我还尝试使用 'withColumn' 来解码和加载 json, 但是当我用这个命令加载 json 时:

df.withColumn("payload", from_json(col("payload"), json_schema))

“payload”中的每个单元格都返回 null(即使我将自己限制为只有一行)。

为什么这种负载不起作用?有没有更好的方法?

将地图投射到json部分:问了同事后,我明白这样的投射是行不通的,因为地图类型是键值没有任何特定模式的类型不像结构类型。由于需要更多信息,无法映射到结构转换。

对于加载 json 部分: 在删除 json 加载并使用 " failfast" 模式加载 json:

json_schema = spark.read.json(df.rdd.map(lambda row: row["payload"])).schema
df = df.withColumn("payload", from_json(col("payload"), json_schema, options={"mode": "FAILFAST"}))

我遇到了一个异常:BadRecordException: java.lang.RuntimeException: Parsing JSON arrays as structs is forbidden.

我用另一个 json 扭曲了负载,像这样:

def warp_data(payload):
  try:
    payload = json.loads(payload, strict=False)
    payload_as_dictionary = {"payload": payload}
    return json.dumps(payload_as_dictionary)
  except:
    return None

warp_data_udf = udf(warp_data)

正在做:

df.withColumn("payload", warp_data_udf("payload"))

之后,我能够加载 json 并使用它。