如何使用 pyspark SQL 将字符串传输到字典

How to transfer the string to a dict with pysparkSQL

在 pysparkSQL 中,我有一个名为 bmd2 的 DataFrame,如下所示:

DataFrame[genres: string, id: int, tagline: string, title: string, vote_average: double, vote_count: int]

数据bmd2['genres']是这样的:

bmd2.select('genres').show():
+--------------------+
|              genres|
+--------------------+
|[{'id': 16, 'name...|
|[{'id': 12, 'name...|
|[{'id': 10749, 'n...|
|[{'id': 35, 'name...|
|[{'id': 35, 'name...|
|[{'id': 28, 'name...|
|[{'id': 35, 'name...|
|[{'id': 28, 'name...|
|[{'id': 28, 'name...|
|[{'id': 12, 'name...|
|[{'id': 35, 'name...|
|[{'id': 35, 'name...|
|[{'id': 10751, 'n...|
|[{'id': 36, 'name...|
|[{'id': 28, 'name...|
|[{'id': 18, 'name...|
|[{'id': 18, 'name...|
|[{'id': 80, 'name...|
|[{'id': 80, 'name...|
|[{'id': 28, 'name...|
+--------------------+
only showing top 20 rows

'genres' 列中的数据类型是字符串,但可以将它们传输到 python 中具有 'eval function' 的字典列表。那么我应该如何在这里应用eval()来将这里的字符串传输到每一行中的列表呢?我尝试了很多方法:

  1. bmd2.select('genres'.astype('list')):AttributeError: 'str' object has no attribute 'astype'
  2. bmd2.select(eval('genres')):NameError: name 'genres' is not defined
  3. bmd2.withColumn('genres',eval('genres')):NameError: name 'genres' is not defined

我写这个作为答案,因为我找不到评论选项。我建议你看看 pyspark.sql.functions 中的 from_json。例如,您可以这样使用它:

# given a row that looks like:

+----------genres-------------+
| [{ id:1, name:"hiphop"}]    |
+-----------------------------+

# define a schema
schema = ArrayType(StructType().add("id", IntegerType())\
                              .add("name", StringType()))

# transform
new_df = df.select(from_json("genres", schema).alias("genres_dict"))

# display
new_df.printSchema()
new_df.show()

还有一种方法可以使用名为 regexp_extract 的函数来实现此目的。但以上是我个人的喜好。此外,如果您想切换回原始字符串,可以使用 to_json 函数。希望这有帮助。

我使用用户定义函数 UDF 解决了我的问题。

首先,导入它:

from pyspark.sql.functions import udf

然后,定义您的 UDF,就像匿名函数一样:

getdirector = udf(lambda x:[i['name'] for i in x if i['job'] == 'Director'],StringType())

您应该在此处指定 return 值的类型,这样您将获得具有您期望类型的 return 值。然后你就可以像其他函数一样在你的代码中调用这个UDF了。

cres2 = cres1.select('id',getcharacter('cast').alias('cast'),getdirector('crew').alias('crew'))

在这个问题中,我可以修改UDF以获得我需要的任何类型。