如果列表中的值在另一列中,则 Pyspark 更改列值

Pyspark change column value if value from list is in another column

我有一个这样的数据框:

+-------+----------------+
|Name   |Source          |
+-------+----------------+
|Tom    |clientA-incoming|
|Dick   |clientB-incoming|
|Harry  |c-abc-incoming  |

我想添加一列 slug 以结束此数据框:

+-------+----------------+--------+
|Name   |Source          |slug    |
+-------+----------------+--------+
|Tom    |clientA-incoming|clientA |
|Dick   |clientB-incoming|clientB |
|Harry  |c-abc-incoming  |c-abc   |

我有一个包含 slug 的值列表:

slugs = ['clientA', 'clientB', 'c-abc']

我基本上是按照这个伪代码的思路思考的:

for i in slugs:
    if i in df['Source']:
        df['Slug'] = i

谁能帮我冲过终点线?

编辑:

我想用 slugs 列表中的值更新 slug 列。进入 slug 列的具体值是根据 Source 列确定的。

例如,由于 slugs[0] = 'clientA' 和 clientA 是 clientA-incoming 的子字符串,我想将 slug 列中该行的值更新为 clientA

这可以根据您的要求使用左连接或内连接来解决:

from pyspark.sql.functions import broadcast

slugs = ['clientA', 'clientB', 'c-abc', 'f-gd']
sdf = spark.createDataFrame(slugs, "string").withColumnRenamed("value", "slug")

df = spark.createDataFrame([
  ["Tom", "clientA-incoming"],
  ["Dick", "clientB-incoming"],
  ["Harry", "c-abc-incoming"],
  ["Harry", "c-dgl-incoming"]
], ["Name", "Source"])

df.join(broadcast(sdf), df["Source"].contains(sdf["slug"]), "left").show()

# +-----+----------------+-------+
# | Name|          Source|   slug|
# +-----+----------------+-------+
# |  Tom|clientA-incoming|clientA|
# | Dick|clientB-incoming|clientB|
# |Harry|  c-abc-incoming|  c-abc|
# |Harry|  c-dgl-incoming|   null|
# +-----+----------------+-------+

请注意,我们广播较小的 df 以防止混洗。