如何在 PySpark 中将字符串转换为字典的 ArrayType (JSON)
How to cast string to ArrayType of dictionary (JSON) in PySpark
尝试将 StringType 转换为 JSON 的 ArrayType,以获取从 CSV 生成的数据框。
在 Spark2
上使用 pyspark
我正在处理的CSV文件;如下-
date,attribute2,count,attribute3
2017-09-03,'attribute1_value1',2,'[{"key":"value","key2":2},{"key":"value","key2":2},{"key":"value","key2":2}]'
2017-09-04,'attribute1_value2',2,'[{"key":"value","key2":20},{"key":"value","key2":25},{"key":"value","key2":27}]'
如上所示,它在文字串中包含一个属性"attribute3"
,从技术上讲,它是一个精确长度为2的字典列表(JSON)。
(这是函数 distinct 的输出)
来自 printSchema()
的片段
attribute3: string (nullable = true)
我正在尝试将 "attribute3"
转换为 ArrayType
,如下所示
temp = dataframe.withColumn(
"attribute3_modified",
dataframe["attribute3"].cast(ArrayType())
)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: __init__() takes at least 2 arguments (1 given)
确实,ArrayType
需要数据类型作为参数。我尝试使用 "json"
,但没有用。
期望的输出 -
最后,我需要将 attribute3
转换为 ArrayType()
或简单的 Python 列表。 (我试图避免使用 eval
)
如何将其转换为 ArrayType
,以便我可以将其视为 JSON 的列表?
我在这里遗漏了什么吗?
(documentation没有直接解决这个问题)
将 from_json
与与 attribute3
列中的实际数据相匹配的模式一起使用,将 json 转换为 ArrayType:
原始数据框:
df.printSchema()
#root
# |-- date: string (nullable = true)
# |-- attribute2: string (nullable = true)
# |-- count: long (nullable = true)
# |-- attribute3: string (nullable = true)
from pyspark.sql.functions import from_json
from pyspark.sql.types import *
创建架构:
schema = ArrayType(
StructType([StructField("key", StringType()),
StructField("key2", IntegerType())]))
使用from_json
:
df = df.withColumn("attribute3", from_json(df.attribute3, schema))
df.printSchema()
#root
# |-- date: string (nullable = true)
# |-- attribute2: string (nullable = true)
# |-- count: long (nullable = true)
# |-- attribute3: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- key: string (nullable = true)
# | | |-- key2: integer (nullable = true)
df.show(1, False)
#+----------+----------+-----+------------------------------------+
#|date |attribute2|count|attribute3 |
#+----------+----------+-----+------------------------------------+
#|2017-09-03|attribute1|2 |[[value, 2], [value, 2], [value, 2]]|
#+----------+----------+-----+------------------------------------+
@Psidom 的 对我不起作用,因为我使用的是 Spark 2.1。
就我而言,我不得不稍微修改您的 attribute3
字符串以将其包装在字典中:
import pyspark.sql.functions as f
df2 = df.withColumn("attribute3", f.concat(f.lit('{"data": '), "attribute3", f.lit("}")))
df2.select("attribute3").show(truncate=False)
#+--------------------------------------------------------------------------------------+
#|attribute3 |
#+--------------------------------------------------------------------------------------+
#|{"data": [{"key":"value","key2":2},{"key":"value","key2":2},{"key":"value","key2":2}]}|
#+--------------------------------------------------------------------------------------+
现在我可以按如下方式定义架构:
schema = StructType(
[
StructField(
"data",
ArrayType(
StructType(
[
StructField("key", StringType()),
StructField("key2", IntegerType())
]
)
)
)
]
)
现在使用 from_json
然后是 getItem()
:
df3 = df2.withColumn("attribute3", f.from_json("attribute3", schema).getItem("data"))
df3.show(truncate=False)
#+----------+----------+-----+---------------------------------+
#|date |attribute2|count|attribute3 |
#+----------+----------+-----+---------------------------------+
#|2017-09-03|attribute1|2 |[[value,2], [value,2], [value,2]]|
#+----------+----------+-----+---------------------------------+
架构:
df3.printSchema()
# root
# |-- attribute3: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- key: string (nullable = true)
# | | |-- key2: integer (nullable = true)
尝试将 StringType 转换为 JSON 的 ArrayType,以获取从 CSV 生成的数据框。
在 Spark2
pyspark
我正在处理的CSV文件;如下-
date,attribute2,count,attribute3
2017-09-03,'attribute1_value1',2,'[{"key":"value","key2":2},{"key":"value","key2":2},{"key":"value","key2":2}]'
2017-09-04,'attribute1_value2',2,'[{"key":"value","key2":20},{"key":"value","key2":25},{"key":"value","key2":27}]'
如上所示,它在文字串中包含一个属性"attribute3"
,从技术上讲,它是一个精确长度为2的字典列表(JSON)。
(这是函数 distinct 的输出)
来自 printSchema()
attribute3: string (nullable = true)
我正在尝试将 "attribute3"
转换为 ArrayType
,如下所示
temp = dataframe.withColumn(
"attribute3_modified",
dataframe["attribute3"].cast(ArrayType())
)
Traceback (most recent call last): File "<stdin>", line 1, in <module> TypeError: __init__() takes at least 2 arguments (1 given)
确实,ArrayType
需要数据类型作为参数。我尝试使用 "json"
,但没有用。
期望的输出 -
最后,我需要将 attribute3
转换为 ArrayType()
或简单的 Python 列表。 (我试图避免使用 eval
)
如何将其转换为 ArrayType
,以便我可以将其视为 JSON 的列表?
我在这里遗漏了什么吗?
(documentation没有直接解决这个问题)
将 from_json
与与 attribute3
列中的实际数据相匹配的模式一起使用,将 json 转换为 ArrayType:
原始数据框:
df.printSchema()
#root
# |-- date: string (nullable = true)
# |-- attribute2: string (nullable = true)
# |-- count: long (nullable = true)
# |-- attribute3: string (nullable = true)
from pyspark.sql.functions import from_json
from pyspark.sql.types import *
创建架构:
schema = ArrayType(
StructType([StructField("key", StringType()),
StructField("key2", IntegerType())]))
使用from_json
:
df = df.withColumn("attribute3", from_json(df.attribute3, schema))
df.printSchema()
#root
# |-- date: string (nullable = true)
# |-- attribute2: string (nullable = true)
# |-- count: long (nullable = true)
# |-- attribute3: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- key: string (nullable = true)
# | | |-- key2: integer (nullable = true)
df.show(1, False)
#+----------+----------+-----+------------------------------------+
#|date |attribute2|count|attribute3 |
#+----------+----------+-----+------------------------------------+
#|2017-09-03|attribute1|2 |[[value, 2], [value, 2], [value, 2]]|
#+----------+----------+-----+------------------------------------+
@Psidom 的
就我而言,我不得不稍微修改您的 attribute3
字符串以将其包装在字典中:
import pyspark.sql.functions as f
df2 = df.withColumn("attribute3", f.concat(f.lit('{"data": '), "attribute3", f.lit("}")))
df2.select("attribute3").show(truncate=False)
#+--------------------------------------------------------------------------------------+
#|attribute3 |
#+--------------------------------------------------------------------------------------+
#|{"data": [{"key":"value","key2":2},{"key":"value","key2":2},{"key":"value","key2":2}]}|
#+--------------------------------------------------------------------------------------+
现在我可以按如下方式定义架构:
schema = StructType(
[
StructField(
"data",
ArrayType(
StructType(
[
StructField("key", StringType()),
StructField("key2", IntegerType())
]
)
)
)
]
)
现在使用 from_json
然后是 getItem()
:
df3 = df2.withColumn("attribute3", f.from_json("attribute3", schema).getItem("data"))
df3.show(truncate=False)
#+----------+----------+-----+---------------------------------+
#|date |attribute2|count|attribute3 |
#+----------+----------+-----+---------------------------------+
#|2017-09-03|attribute1|2 |[[value,2], [value,2], [value,2]]|
#+----------+----------+-----+---------------------------------+
架构:
df3.printSchema()
# root
# |-- attribute3: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- key: string (nullable = true)
# | | |-- key2: integer (nullable = true)