Pyspark:正则表达式搜索列表中的文本 withColumn

Pyspark: regex search with text in a list withColumn

我是 Spark 的新手,我有一个愚蠢的“什么是最好的方法”问题。基本上,我有一个我想循环的 map(dict) 。在每次迭代期间,我想使用 rlike 正则表达式搜索 spark 数据框中的列,并使用 withColumn

将字典的键分配给新列
maps = {"groceries": ["hot chocolate", "milk", "sugar", "flour"],
        "laundry": ["soap", "detergent", "fabric softener"]
}

数据样本如下图

+--------------------+-----------+
|                  id|item_bought|
+--------------------+-----------+
|uiq7Zq52Bww4pZXc3xri|       Soap|
|fpJatwxTeObcbuJH25UI|  Detergent|
|MdK1q5gBygIGFYyvbz8J|       Milk|
+--------------------+-----------+

我想要一个如下所示的数据框:

+--------------------+-----------+---------+
|                  id|item_bought|    class|
+--------------------+-----------+---------+
|uiq7Zq52Bww4pZXc3xri|       Soap|  Laundry|
|fpJatwxTeObcbuJH25UI|  Detergent|  Laundry|
|MdK1q5gBygIGFYyvbz8J|       Milk|Groceries|
+--------------------+-----------+---------+

我有超过 1 亿条记录,我想要一种使用 Spark 最佳实践(分布式计算)的方法。我想到的一种方法是遍历地图并使用 rlikestr.contains 进行正则表达式搜索,如下所示:

for key, value in maps.items():
    pattern = '|'.join([f'(?i){x}' for x in value]). # ignore case
    df.withColumn("class", col("item_bought").rlike(pattern))

但是这个returns true or false rlike 搜索。我想用 key 值替换 true 或 false。

此外,考虑到我有 100M(最多 150M)条记录,遍历地图是否是最佳方法?

编辑

如果 df 中的 items_bought 有特殊字符(或一些额外的文本)怎么办?

+--------------------+----------------+
|                  id|     item_bought|
+--------------------+----------------+
|uiq7Zq52Bww4pZXc3xri|   Soap -&ju10kg|
|fpJatwxTeObcbuJH25UI|Detergent x.ju2i|
|MdK1q5gBygIGFYyvbz8J|            Milk|
+--------------------+----------------+

我不想先清理文本,只需根据正则表达式关键字搜索分配 类

有你这种情况,我就把地图变成dataframe。我假设生成的数据帧会相对较小。使用广播加入。这样做的目的是将小 df 分发给每个工作节点,避免洗牌。

#Create df from maps
    df_ref = spark.createDataFrame(maps.items(), schema =('class','item_bought')).withColumn('item_bought',explode('item_bought')).withColumn('item_bought', initcap('item_bought'))

#Broadcast join    
    df.join(broadcast(df_ref), how='left', on='item_bought').show()


+-----------+--------------------+---------+
|item_bought|                  id|    class|
+-----------+--------------------+---------+
|       Soap|uiq7Zq52Bww4pZXc3xri|  laundry|
|  Detergent|fpJatwxTeObcbuJH25UI|  laundry|
|       Milk|MdK1q5gBygIGFYyvbz8J|groceries|
+-----------+--------------------+---------+

根据您的编辑

df_ref = spark.createDataFrame(maps.items(), schema =('class','item_bought1')).withColumn('item_bought1',explode('item_bought1')).withColumn('item_bought1', initcap('item_bought1'))


df.withColumn('item_bought1',regexp_extract('item_bought','^[A-Za-z]+',0)).join(broadcast(df_ref), how='left', on='item_bought1').show()

+------------+--------------------+----------------+---------+
|item_bought1|                  id|     item_bought|    class|
+------------+--------------------+----------------+---------+
|        Soap|uiq7Zq52Bww4pZXc3xri|            Soap|  laundry|
|   Detergent|fpJatwxTeObcbuJH25UI|       Detergent|  laundry|
|        Milk|MdK1q5gBygIGFYyvbz8J|            Milk|groceries|
|        Soap|uiq7Zq52Bww4pZXc3xri|   Soap -&ju10kg|  laundry|
|   Detergent|fpJatwxTeObcbuJH25UI|Detergent x.ju2i|  laundry|
|        Milk|MdK1q5gBygIGFYyvbz8J|            Milk|groceries|

+------------+--------------------+---------- ------+----------+