PySpark 在列上应用自定义函数
PySpark apply custom function on column
我想 运行 数据框列上的自定义函数。该列有一个长字符串,其中包含一些电子邮件。字符串格式是这样的:
"Don Joe<abc@bankatfirst.com>, Matt Scheurer <def@bankatfirst.com>, Dan Lawler <ghi@bankatfirst.com>"
我必须使用 运行 正则表达式来提取电子邮件,然后我必须找出整个列中有多少独特的电子邮件。
我可以编写正则表达式并在 python 中创建唯一的电子邮件列表。但我不知道如何在 spark dataframe 上应用这个函数。我试过这样做:
all_names = set()
def get_distinct_users(userlist):
global all_names
for email in re.findall('\<\S*\>',userlist):
all_names.add(email)
get_distinct_users_udf = udf(get_distinct_users,StringType())
users = users.withColumn("user_count",get_distinct_users_udf(users["users"]))
但是全局变量 all_names 没有得到更新。我应该应用 map 函数而不是创建 UDF,还是 reduce 因为它是一种聚合函数?
有什么想法吗?
您可以这样做的一种方法是 flatMap
一个函数来提取列上的电子邮件地址列表,例如
import re
def get_email(x):
return re.findall("\<\S*\>", x)
uniqueEmails = users.select("users").rdd\
.flatMap(lambda x: get_email(x[0]))\
.distinct()
将是不同电子邮件地址的 RDD。
我想 运行 数据框列上的自定义函数。该列有一个长字符串,其中包含一些电子邮件。字符串格式是这样的:
"Don Joe<abc@bankatfirst.com>, Matt Scheurer <def@bankatfirst.com>, Dan Lawler <ghi@bankatfirst.com>"
我必须使用 运行 正则表达式来提取电子邮件,然后我必须找出整个列中有多少独特的电子邮件。
我可以编写正则表达式并在 python 中创建唯一的电子邮件列表。但我不知道如何在 spark dataframe 上应用这个函数。我试过这样做:
all_names = set()
def get_distinct_users(userlist):
global all_names
for email in re.findall('\<\S*\>',userlist):
all_names.add(email)
get_distinct_users_udf = udf(get_distinct_users,StringType())
users = users.withColumn("user_count",get_distinct_users_udf(users["users"]))
但是全局变量 all_names 没有得到更新。我应该应用 map 函数而不是创建 UDF,还是 reduce 因为它是一种聚合函数?
有什么想法吗?
您可以这样做的一种方法是 flatMap
一个函数来提取列上的电子邮件地址列表,例如
import re
def get_email(x):
return re.findall("\<\S*\>", x)
uniqueEmails = users.select("users").rdd\
.flatMap(lambda x: get_email(x[0]))\
.distinct()
将是不同电子邮件地址的 RDD。