使用来自另一个 DataFrame 的关键字过滤 Spark DataFrame
Filter Spark DataFrame using keywords from another DataFrame
我将大量新闻文章加载到 PySpark DataFrame 中。我有兴趣将该 DataFrame 过滤到正文中包含某些感兴趣词的文章集。目前关键字列表很小,但我还是想将它们存储在 DataFrame 中,因为该列表将来可能会扩展。考虑以下小例子:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
article_df = [{'source': 'a', 'body': 'Seattle is in Washington.'},
{'source': 'b', 'body': 'Los Angeles is in California'},
{'source': 'a', 'body': 'Banana is a fruit'}]
article_data = spark.createDataFrame(article_data)
keyword_data = [{'city': 'Seattle', 'state': 'Washington'},
{'city': 'Los Angeles', 'state': 'California'}]
keyword_df = spark.createDataFrame(keyword_data)
这为我们提供了以下数据帧:
+--------------------+------+
| body|source|
+--------------------+------+
|Seattle is in Was...| a|
|Los Angeles is in...| b|
| Banana is a fruit| a|
+--------------------+------+
+-----------+----------+
| city| state|
+-----------+----------+
| Seattle|Washington|
|Los Angeles|California|
+-----------+----------+
作为第一步,我想过滤掉 article_df
,这样它只包含 body
字符串包含 keyword_df['city']
中任何字符串的文章。我还想将其过滤为包含来自 keyword_df['city']
的字符串和 keyword_df['state']
中相应条目(同一行)的文章。我怎样才能做到这一点?
我已经通过手动定义的关键字列表做到了这一点:
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
def city_filter(x):
cities = ['Seattle', 'Los Angeles']
x = x.lower()
return any(s.lower() in x for s in cities)
filterUDF = udf(city_filter, BooleanType())
然后article_df.filter(filterUDF(article_df.body)).show()
给出想要的结果:
+--------------------+------+
| body|source|
+--------------------+------+
|Seattle is in Was...| a|
|Los Angeles is in...| b|
+--------------------+------+
如何在不必手动定义关键字列表(或关键字对元组)的情况下实现此过滤器?我需要为此使用 UDF 吗?
您可以使用 leftsemi
加入自定义表达式来实现它,例如:
body_contains_city = expr('body like concat("%", city, "%")')
article_df.join(keyword_df, body_contains_city, 'leftsemi').show()
我将大量新闻文章加载到 PySpark DataFrame 中。我有兴趣将该 DataFrame 过滤到正文中包含某些感兴趣词的文章集。目前关键字列表很小,但我还是想将它们存储在 DataFrame 中,因为该列表将来可能会扩展。考虑以下小例子:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
article_df = [{'source': 'a', 'body': 'Seattle is in Washington.'},
{'source': 'b', 'body': 'Los Angeles is in California'},
{'source': 'a', 'body': 'Banana is a fruit'}]
article_data = spark.createDataFrame(article_data)
keyword_data = [{'city': 'Seattle', 'state': 'Washington'},
{'city': 'Los Angeles', 'state': 'California'}]
keyword_df = spark.createDataFrame(keyword_data)
这为我们提供了以下数据帧:
+--------------------+------+
| body|source|
+--------------------+------+
|Seattle is in Was...| a|
|Los Angeles is in...| b|
| Banana is a fruit| a|
+--------------------+------+
+-----------+----------+
| city| state|
+-----------+----------+
| Seattle|Washington|
|Los Angeles|California|
+-----------+----------+
作为第一步,我想过滤掉 article_df
,这样它只包含 body
字符串包含 keyword_df['city']
中任何字符串的文章。我还想将其过滤为包含来自 keyword_df['city']
的字符串和 keyword_df['state']
中相应条目(同一行)的文章。我怎样才能做到这一点?
我已经通过手动定义的关键字列表做到了这一点:
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
def city_filter(x):
cities = ['Seattle', 'Los Angeles']
x = x.lower()
return any(s.lower() in x for s in cities)
filterUDF = udf(city_filter, BooleanType())
然后article_df.filter(filterUDF(article_df.body)).show()
给出想要的结果:
+--------------------+------+
| body|source|
+--------------------+------+
|Seattle is in Was...| a|
|Los Angeles is in...| b|
+--------------------+------+
如何在不必手动定义关键字列表(或关键字对元组)的情况下实现此过滤器?我需要为此使用 UDF 吗?
您可以使用 leftsemi
加入自定义表达式来实现它,例如:
body_contains_city = expr('body like concat("%", city, "%")')
article_df.join(keyword_df, body_contains_city, 'leftsemi').show()