Pyspark Dataframe 从以字符串作为元素列表的列中获取唯一元素
Pyspark Dataframe get unique elements from column with string as list of elements
我有一个数据框(通过从 Azure 中的多个 blob 加载创建),其中有一列是 ID 列表。
现在,我想要整个专栏中的唯一 ID 列表:
这是一个例子 -
df -
| col1 | col2 | col3 |
| "a" | "b" |"[q,r]"|
| "c" | "f" |"[s,r]"|
这是我预期的回复:
resp = [q, r, s]
知道怎么去吗?
我目前的方法是将 col3 中的字符串转换为 python 列表,然后可能以某种方式将它们展平。
但到目前为止我还做不到。我尝试在 pyspark 中使用用户定义的函数,但它们仅 return 字符串而不是列表。
FlatMaps 只能在 RDD 上工作,不能在 Dataframes 上工作,所以它们不合时宜。
也许我可以在从 RDD 到数据帧的转换过程中指定它。但不确定该怎么做。
我们可以将 UDF 与 collect_list 一起使用。我试过了,
>>> from pyspark.sql import functions as F
>>> from pyspark.sql.types import *
>>> from functools import reduce
>>> df = spark.createDataFrame([('a','b','[q,r]'),('c','f','[s,r]')],['col1','col2','col3'])
>>> df.show()
+----+----+-----+
|col1|col2| col3|
+----+----+-----+
| a| b|[q,r]|
| c| f|[s,r]|
+----+----+-----+
>>> udf1 = F.udf(lambda x : [v for v in reduce(lambda x,y : set(x+y),d) if v not in ['[',']',',']],ArrayType(StringType()))
## col3 value is string of list. we concat the strings and set over it which removes duplicates.
## Also, we have converted string to set, means it will return [ ] , as values( like '[',']',',').we remove those.
>>> df.select(udf1(F.collect_list('col3')).alias('col3')).first().col3
['q', 'r', 's']
不确定性能。希望这对您有所帮助。!
这是一个仅使用 DataFrame 函数的方法:
df = spark.createDataFrame([('a','b','[q,r,p]'),('c','f','[s,r]')],['col1','col2','col3'])
df=df.withColumn('col4', f.split(f.regexp_extract('col3', '\[(.*)\]',1), ','))
df.select(f.explode('col4').alias('exploded')).groupby('exploded').count().show()
我有一个数据框(通过从 Azure 中的多个 blob 加载创建),其中有一列是 ID 列表。 现在,我想要整个专栏中的唯一 ID 列表:
这是一个例子 -
df -
| col1 | col2 | col3 |
| "a" | "b" |"[q,r]"|
| "c" | "f" |"[s,r]"|
这是我预期的回复:
resp = [q, r, s]
知道怎么去吗?
我目前的方法是将 col3 中的字符串转换为 python 列表,然后可能以某种方式将它们展平。
但到目前为止我还做不到。我尝试在 pyspark 中使用用户定义的函数,但它们仅 return 字符串而不是列表。
FlatMaps 只能在 RDD 上工作,不能在 Dataframes 上工作,所以它们不合时宜。
也许我可以在从 RDD 到数据帧的转换过程中指定它。但不确定该怎么做。
我们可以将 UDF 与 collect_list 一起使用。我试过了,
>>> from pyspark.sql import functions as F
>>> from pyspark.sql.types import *
>>> from functools import reduce
>>> df = spark.createDataFrame([('a','b','[q,r]'),('c','f','[s,r]')],['col1','col2','col3'])
>>> df.show()
+----+----+-----+
|col1|col2| col3|
+----+----+-----+
| a| b|[q,r]|
| c| f|[s,r]|
+----+----+-----+
>>> udf1 = F.udf(lambda x : [v for v in reduce(lambda x,y : set(x+y),d) if v not in ['[',']',',']],ArrayType(StringType()))
## col3 value is string of list. we concat the strings and set over it which removes duplicates.
## Also, we have converted string to set, means it will return [ ] , as values( like '[',']',',').we remove those.
>>> df.select(udf1(F.collect_list('col3')).alias('col3')).first().col3
['q', 'r', 's']
不确定性能。希望这对您有所帮助。!
这是一个仅使用 DataFrame 函数的方法:
df = spark.createDataFrame([('a','b','[q,r,p]'),('c','f','[s,r]')],['col1','col2','col3'])
df=df.withColumn('col4', f.split(f.regexp_extract('col3', '\[(.*)\]',1), ','))
df.select(f.explode('col4').alias('exploded')).groupby('exploded').count().show()