PySpark 在列上应用自定义函数

PySpark apply custom function on column

我想 运行 数据框列上的自定义函数。该列有一个长字符串,其中包含一些电子邮件。字符串格式是这样的:

"Don Joe<abc@bankatfirst.com>,  Matt Scheurer <def@bankatfirst.com>, Dan Lawler <ghi@bankatfirst.com>"

我必须使用 运行 正则表达式来提取电子邮件,然后我必须找出整个列中有多少独特的电子邮件。

我可以编写正则表达式并在 python 中创建唯一的电子邮件列表。但我不知道如何在 spark dataframe 上应用这个函数。我试过这样做:

all_names = set()

def get_distinct_users(userlist):
    global all_names
    for email in re.findall('\<\S*\>',userlist):
        all_names.add(email)

get_distinct_users_udf = udf(get_distinct_users,StringType())
users = users.withColumn("user_count",get_distinct_users_udf(users["users"]))

但是全局变量 all_names 没有得到更新。我应该应用 map 函数而不是创建 UDF,还是 reduce 因为它是一种聚合函数?

有什么想法吗?

您可以这样做的一种方法是 flatMap 一个函数来提取列上的电子邮件地址列表,例如

import re

def get_email(x):
  return re.findall("\<\S*\>", x)

uniqueEmails = users.select("users").rdd\
  .flatMap(lambda x: get_email(x[0]))\
  .distinct()

将是不同电子邮件地址的 RDD。