PySpark:有没有办法将地图类型转换为结构?
PySpark: Is there a way to convert map type to struct?
我使用 rdd.map 从这样的列中提取和解码 json:
def process_data(row):
encoded_data = json.loads(row["value"])
base64_bytes = encoded_data["payload"].encode('ascii')
ecoded_data_bytes = base64.b64decode(base64_bytes)
data = json.loads(ecoded_data_bytes.decode('ascii'), strict=False)
return data, row["file_name"], row["load_time"]
df = df.rdd.map(process_data).toDF
我得到了一个映射类型的数据列,但我想要它作为一个结构,我可以这样做吗?
我正在处理的一行数据如下所示:
{“value” = <encoded data>, “file_name”=“a”, “load_time”=1/1/1}
编码后的数据(值)如下所示:
{“payload”=[
{
“key_1”={
“key_2”=val_2,
“key_3”=val_3
}
}, {
“key_1”={
“key_2”=val_2,
“key_3”=val_3
}},
}]}
为了避免这个问题,我还尝试使用 'withColumn' 来解码和加载 json,
但是当我用这个命令加载 json 时:
df.withColumn("payload", from_json(col("payload"), json_schema))
“payload”中的每个单元格都返回 null(即使我将自己限制为只有一行)。
为什么这种负载不起作用?有没有更好的方法?
将地图投射到json部分:问了同事后,我明白这样的投射是行不通的,因为地图类型是键值没有任何特定模式的类型不像结构类型。由于需要更多信息,无法映射到结构转换。
对于加载 json 部分: 在删除 json 加载并使用 " failfast" 模式加载 json:
json_schema = spark.read.json(df.rdd.map(lambda row: row["payload"])).schema
df = df.withColumn("payload", from_json(col("payload"), json_schema, options={"mode": "FAILFAST"}))
我遇到了一个异常:BadRecordException: java.lang.RuntimeException: Parsing JSON arrays as structs is forbidden.
我用另一个 json 扭曲了负载,像这样:
def warp_data(payload):
try:
payload = json.loads(payload, strict=False)
payload_as_dictionary = {"payload": payload}
return json.dumps(payload_as_dictionary)
except:
return None
warp_data_udf = udf(warp_data)
正在做:
df.withColumn("payload", warp_data_udf("payload"))
之后,我能够加载 json 并使用它。
我使用 rdd.map 从这样的列中提取和解码 json:
def process_data(row):
encoded_data = json.loads(row["value"])
base64_bytes = encoded_data["payload"].encode('ascii')
ecoded_data_bytes = base64.b64decode(base64_bytes)
data = json.loads(ecoded_data_bytes.decode('ascii'), strict=False)
return data, row["file_name"], row["load_time"]
df = df.rdd.map(process_data).toDF
我得到了一个映射类型的数据列,但我想要它作为一个结构,我可以这样做吗?
我正在处理的一行数据如下所示:
{“value” = <encoded data>, “file_name”=“a”, “load_time”=1/1/1}
编码后的数据(值)如下所示:
{“payload”=[
{
“key_1”={
“key_2”=val_2,
“key_3”=val_3
}
}, {
“key_1”={
“key_2”=val_2,
“key_3”=val_3
}},
}]}
为了避免这个问题,我还尝试使用 'withColumn' 来解码和加载 json, 但是当我用这个命令加载 json 时:
df.withColumn("payload", from_json(col("payload"), json_schema))
“payload”中的每个单元格都返回 null(即使我将自己限制为只有一行)。
为什么这种负载不起作用?有没有更好的方法?
将地图投射到json部分:问了同事后,我明白这样的投射是行不通的,因为地图类型是键值没有任何特定模式的类型不像结构类型。由于需要更多信息,无法映射到结构转换。
对于加载 json 部分: 在删除 json 加载并使用 " failfast" 模式加载 json:
json_schema = spark.read.json(df.rdd.map(lambda row: row["payload"])).schema
df = df.withColumn("payload", from_json(col("payload"), json_schema, options={"mode": "FAILFAST"}))
我遇到了一个异常:BadRecordException: java.lang.RuntimeException: Parsing JSON arrays as structs is forbidden.
我用另一个 json 扭曲了负载,像这样:
def warp_data(payload):
try:
payload = json.loads(payload, strict=False)
payload_as_dictionary = {"payload": payload}
return json.dumps(payload_as_dictionary)
except:
return None
warp_data_udf = udf(warp_data)
正在做:
df.withColumn("payload", warp_data_udf("payload"))
之后,我能够加载 json 并使用它。