Pyspark:正则表达式搜索列表中的文本 withColumn
Pyspark: regex search with text in a list withColumn
我是 Spark 的新手,我有一个愚蠢的“什么是最好的方法”问题。基本上,我有一个我想循环的 map(dict) 。在每次迭代期间,我想使用 rlike
正则表达式搜索 spark 数据框中的列,并使用 withColumn
将字典的键分配给新列
maps = {"groceries": ["hot chocolate", "milk", "sugar", "flour"],
"laundry": ["soap", "detergent", "fabric softener"]
}
数据样本如下图
+--------------------+-----------+
| id|item_bought|
+--------------------+-----------+
|uiq7Zq52Bww4pZXc3xri| Soap|
|fpJatwxTeObcbuJH25UI| Detergent|
|MdK1q5gBygIGFYyvbz8J| Milk|
+--------------------+-----------+
我想要一个如下所示的数据框:
+--------------------+-----------+---------+
| id|item_bought| class|
+--------------------+-----------+---------+
|uiq7Zq52Bww4pZXc3xri| Soap| Laundry|
|fpJatwxTeObcbuJH25UI| Detergent| Laundry|
|MdK1q5gBygIGFYyvbz8J| Milk|Groceries|
+--------------------+-----------+---------+
我有超过 1 亿条记录,我想要一种使用 Spark 最佳实践(分布式计算)的方法。我想到的一种方法是遍历地图并使用 rlike
或 str.contains
进行正则表达式搜索,如下所示:
for key, value in maps.items():
pattern = '|'.join([f'(?i){x}' for x in value]). # ignore case
df.withColumn("class", col("item_bought").rlike(pattern))
但是这个returns true
or false
rlike 搜索。我想用 key
值替换 true 或 false。
此外,考虑到我有 100M(最多 150M)条记录,遍历地图是否是最佳方法?
编辑
如果 df
中的 items_bought
有特殊字符(或一些额外的文本)怎么办?
+--------------------+----------------+
| id| item_bought|
+--------------------+----------------+
|uiq7Zq52Bww4pZXc3xri| Soap -&ju10kg|
|fpJatwxTeObcbuJH25UI|Detergent x.ju2i|
|MdK1q5gBygIGFYyvbz8J| Milk|
+--------------------+----------------+
我不想先清理文本,只需根据正则表达式关键字搜索分配 类
有你这种情况,我就把地图变成dataframe。我假设生成的数据帧会相对较小。使用广播加入。这样做的目的是将小 df 分发给每个工作节点,避免洗牌。
#Create df from maps
df_ref = spark.createDataFrame(maps.items(), schema =('class','item_bought')).withColumn('item_bought',explode('item_bought')).withColumn('item_bought', initcap('item_bought'))
#Broadcast join
df.join(broadcast(df_ref), how='left', on='item_bought').show()
+-----------+--------------------+---------+
|item_bought| id| class|
+-----------+--------------------+---------+
| Soap|uiq7Zq52Bww4pZXc3xri| laundry|
| Detergent|fpJatwxTeObcbuJH25UI| laundry|
| Milk|MdK1q5gBygIGFYyvbz8J|groceries|
+-----------+--------------------+---------+
根据您的编辑
df_ref = spark.createDataFrame(maps.items(), schema =('class','item_bought1')).withColumn('item_bought1',explode('item_bought1')).withColumn('item_bought1', initcap('item_bought1'))
df.withColumn('item_bought1',regexp_extract('item_bought','^[A-Za-z]+',0)).join(broadcast(df_ref), how='left', on='item_bought1').show()
+------------+--------------------+----------------+---------+
|item_bought1| id| item_bought| class|
+------------+--------------------+----------------+---------+
| Soap|uiq7Zq52Bww4pZXc3xri| Soap| laundry|
| Detergent|fpJatwxTeObcbuJH25UI| Detergent| laundry|
| Milk|MdK1q5gBygIGFYyvbz8J| Milk|groceries|
| Soap|uiq7Zq52Bww4pZXc3xri| Soap -&ju10kg| laundry|
| Detergent|fpJatwxTeObcbuJH25UI|Detergent x.ju2i| laundry|
| Milk|MdK1q5gBygIGFYyvbz8J| Milk|groceries|
+------------+--------------------+---------- ------+----------+
我是 Spark 的新手,我有一个愚蠢的“什么是最好的方法”问题。基本上,我有一个我想循环的 map(dict) 。在每次迭代期间,我想使用 rlike
正则表达式搜索 spark 数据框中的列,并使用 withColumn
maps = {"groceries": ["hot chocolate", "milk", "sugar", "flour"],
"laundry": ["soap", "detergent", "fabric softener"]
}
数据样本如下图
+--------------------+-----------+
| id|item_bought|
+--------------------+-----------+
|uiq7Zq52Bww4pZXc3xri| Soap|
|fpJatwxTeObcbuJH25UI| Detergent|
|MdK1q5gBygIGFYyvbz8J| Milk|
+--------------------+-----------+
我想要一个如下所示的数据框:
+--------------------+-----------+---------+
| id|item_bought| class|
+--------------------+-----------+---------+
|uiq7Zq52Bww4pZXc3xri| Soap| Laundry|
|fpJatwxTeObcbuJH25UI| Detergent| Laundry|
|MdK1q5gBygIGFYyvbz8J| Milk|Groceries|
+--------------------+-----------+---------+
我有超过 1 亿条记录,我想要一种使用 Spark 最佳实践(分布式计算)的方法。我想到的一种方法是遍历地图并使用 rlike
或 str.contains
进行正则表达式搜索,如下所示:
for key, value in maps.items():
pattern = '|'.join([f'(?i){x}' for x in value]). # ignore case
df.withColumn("class", col("item_bought").rlike(pattern))
但是这个returns true
or false
rlike 搜索。我想用 key
值替换 true 或 false。
此外,考虑到我有 100M(最多 150M)条记录,遍历地图是否是最佳方法?
编辑
如果 df
中的 items_bought
有特殊字符(或一些额外的文本)怎么办?
+--------------------+----------------+
| id| item_bought|
+--------------------+----------------+
|uiq7Zq52Bww4pZXc3xri| Soap -&ju10kg|
|fpJatwxTeObcbuJH25UI|Detergent x.ju2i|
|MdK1q5gBygIGFYyvbz8J| Milk|
+--------------------+----------------+
我不想先清理文本,只需根据正则表达式关键字搜索分配 类
有你这种情况,我就把地图变成dataframe。我假设生成的数据帧会相对较小。使用广播加入。这样做的目的是将小 df 分发给每个工作节点,避免洗牌。
#Create df from maps
df_ref = spark.createDataFrame(maps.items(), schema =('class','item_bought')).withColumn('item_bought',explode('item_bought')).withColumn('item_bought', initcap('item_bought'))
#Broadcast join
df.join(broadcast(df_ref), how='left', on='item_bought').show()
+-----------+--------------------+---------+
|item_bought| id| class|
+-----------+--------------------+---------+
| Soap|uiq7Zq52Bww4pZXc3xri| laundry|
| Detergent|fpJatwxTeObcbuJH25UI| laundry|
| Milk|MdK1q5gBygIGFYyvbz8J|groceries|
+-----------+--------------------+---------+
根据您的编辑
df_ref = spark.createDataFrame(maps.items(), schema =('class','item_bought1')).withColumn('item_bought1',explode('item_bought1')).withColumn('item_bought1', initcap('item_bought1'))
df.withColumn('item_bought1',regexp_extract('item_bought','^[A-Za-z]+',0)).join(broadcast(df_ref), how='left', on='item_bought1').show()
+------------+--------------------+----------------+---------+
|item_bought1| id| item_bought| class|
+------------+--------------------+----------------+---------+
| Soap|uiq7Zq52Bww4pZXc3xri| Soap| laundry|
| Detergent|fpJatwxTeObcbuJH25UI| Detergent| laundry|
| Milk|MdK1q5gBygIGFYyvbz8J| Milk|groceries|
| Soap|uiq7Zq52Bww4pZXc3xri| Soap -&ju10kg| laundry|
| Detergent|fpJatwxTeObcbuJH25UI|Detergent x.ju2i| laundry|
| Milk|MdK1q5gBygIGFYyvbz8J| Milk|groceries|
+------------+--------------------+---------- ------+----------+