如何通过比较其他两个条目来生成新的 PySpark DataFrame?
How to generate a new PySpark DataFrame by comparing entries in two others?
我想搜索包含字符串字段的 Pyspark DataFrame
并确定每个字段中出现的关键字字符串。假设我有以下 DataFrame
个关键字:
+-----------+----------+
| city| state|
+-----------+----------+
| Seattle|Washington|
|Los Angeles|California|
+-----------+----------+
我想在这个DataFrame
中搜索:
+----------------------------------------+------+
|body |source|
+----------------------------------------+------+
|Seattle is in Washington. |a |
|Los Angeles is in California |b |
|Banana is a fruit |c |
|Seattle is not in New Hampshire |d |
|California is home to Los Angeles |e |
|Seattle, California is not a real place.|f |
+----------------------------------------+------+
我想创建一个新的 DataFrame
来标识每个来源中出现的关键字类型。所以期望的最终结果是:
+-----------+------+-----+
|name |source|type |
+-----------+------+-----+
|Seattle |a |city |
|Washington |a |state|
|Los Angeles|b |city |
|California |b |state|
|Seattle |d |city |
|Los Angeles|e |city |
|California |e |state|
|Seattle |f |city |
|California |f |state|
+-----------+------+-----+
我怎样才能得到这个结果?我可以使用 join
来隔离包含这些关键字的 body
字符串,但我不确定如何跟踪匹配的特定关键字并使用该信息创建新列。
首先,让我们创建和修改数据帧:
import pyspark.sql.functions as psf
keywords_df = sc.parallelize([["Seattle", "Washington"], ["Los Angeles", "California"]])\
.toDF(["city", "state"])
keywords_df = keywords_df\
.withColumn("struct", psf.explode(psf.array(
psf.struct(psf.col("city").alias("word"), psf.lit("city").alias("type")),
psf.struct(psf.col("state").alias("word"), psf.lit("state").alias("type"))
)))\
.select("struct.*")
keywords_df.show()
+-----------+-----+
| word| type|
+-----------+-----+
| Seattle| city|
| Washington|state|
|Los Angeles| city|
| California|state|
+-----------+-----+
如果您的关键词不包含空格,您可以 split
将您的句子变成单词,这样您 exploded
每行只得到一个单词。然后您就可以 join
使用您的关键字数据框。由于 Los Angeles
,此处并非如此。
text_df = sc.parallelize([["Seattle is in Washington.", "a"],["Los Angeles is in California", "b"],
["Banana is a fruit", "c"],["Seattle is not in New Hampshire", "d"],
["California is home to Los Angeles", "e"],["Seattle, California is not a real place.", "f"]])\
.toDF(["body", "source"])
相反,我们将使用带有字符串 contains
条件的连接:
res = text_df.join(keywords_df, text_df.body.contains(keywords_df.word)).drop("body")
res.show()
+------+-----------+-----+
|source| word| type|
+------+-----------+-----+
| a| Seattle| city|
| a| Washington|state|
| b|Los Angeles| city|
| b| California|state|
| d| Seattle| city|
| f| Seattle| city|
| e|Los Angeles| city|
| e| California|state|
| f| California|state|
+------+-----------+-----+
我想搜索包含字符串字段的 Pyspark DataFrame
并确定每个字段中出现的关键字字符串。假设我有以下 DataFrame
个关键字:
+-----------+----------+
| city| state|
+-----------+----------+
| Seattle|Washington|
|Los Angeles|California|
+-----------+----------+
我想在这个DataFrame
中搜索:
+----------------------------------------+------+
|body |source|
+----------------------------------------+------+
|Seattle is in Washington. |a |
|Los Angeles is in California |b |
|Banana is a fruit |c |
|Seattle is not in New Hampshire |d |
|California is home to Los Angeles |e |
|Seattle, California is not a real place.|f |
+----------------------------------------+------+
我想创建一个新的 DataFrame
来标识每个来源中出现的关键字类型。所以期望的最终结果是:
+-----------+------+-----+
|name |source|type |
+-----------+------+-----+
|Seattle |a |city |
|Washington |a |state|
|Los Angeles|b |city |
|California |b |state|
|Seattle |d |city |
|Los Angeles|e |city |
|California |e |state|
|Seattle |f |city |
|California |f |state|
+-----------+------+-----+
我怎样才能得到这个结果?我可以使用 join
来隔离包含这些关键字的 body
字符串,但我不确定如何跟踪匹配的特定关键字并使用该信息创建新列。
首先,让我们创建和修改数据帧:
import pyspark.sql.functions as psf
keywords_df = sc.parallelize([["Seattle", "Washington"], ["Los Angeles", "California"]])\
.toDF(["city", "state"])
keywords_df = keywords_df\
.withColumn("struct", psf.explode(psf.array(
psf.struct(psf.col("city").alias("word"), psf.lit("city").alias("type")),
psf.struct(psf.col("state").alias("word"), psf.lit("state").alias("type"))
)))\
.select("struct.*")
keywords_df.show()
+-----------+-----+
| word| type|
+-----------+-----+
| Seattle| city|
| Washington|state|
|Los Angeles| city|
| California|state|
+-----------+-----+
如果您的关键词不包含空格,您可以 split
将您的句子变成单词,这样您 exploded
每行只得到一个单词。然后您就可以 join
使用您的关键字数据框。由于 Los Angeles
,此处并非如此。
text_df = sc.parallelize([["Seattle is in Washington.", "a"],["Los Angeles is in California", "b"],
["Banana is a fruit", "c"],["Seattle is not in New Hampshire", "d"],
["California is home to Los Angeles", "e"],["Seattle, California is not a real place.", "f"]])\
.toDF(["body", "source"])
相反,我们将使用带有字符串 contains
条件的连接:
res = text_df.join(keywords_df, text_df.body.contains(keywords_df.word)).drop("body")
res.show()
+------+-----------+-----+
|source| word| type|
+------+-----------+-----+
| a| Seattle| city|
| a| Washington|state|
| b|Los Angeles| city|
| b| California|state|
| d| Seattle| city|
| f| Seattle| city|
| e|Los Angeles| city|
| e| California|state|
| f| California|state|
+------+-----------+-----+