如何使用 groupby 和聚合将 pyspark 数据框中的行与多列连接起来
How can I concatenate the rows in a pyspark dataframe with multiple columns using groupby and aggregate
我有一个包含多列的 pyspark 数据框。例如下图。
from pyspark.sql import Row
l = [('Jack',"a","p"),('Jack',"b","q"),('Bell',"c","r"),('Bell',"d","s")]
rdd = sc.parallelize(l)
score_rdd = rdd.map(lambda x: Row(name=x[0], letters1=x[1], letters2=x[2]))
score_card = sqlContext.createDataFrame(score_rdd)
+----+--------+--------+
|name|letters1|letters2|
+----+--------+--------+
|Jack| a| p|
|Jack| b| q|
|Bell| c| r|
|Bell| d| s|
+----+--------+--------+
现在我想按“名称”分组并连接两列每一行中的值。
我知道该怎么做,但假设有数千行,那么我的代码就会变得非常难看。
这是我的解决方案。
import pyspark.sql.functions as f
t = score_card.groupby("name").agg(
f.concat_ws("",collect_list("letters1").alias("letters1")),
f.concat_ws("",collect_list("letters2").alias("letters2"))
)
这是我将其保存为 CSV 文件时得到的输出。
+----+--------+--------+
|name|letters1|letters2|
+----+--------+--------+
|Jack| ab| pq|
|Bell| cd| rs|
+----+--------+--------+
但我主要关心的是这两行代码
f.concat_ws("",collect_list("letters1").alias("letters1")),
f.concat_ws("",collect_list("letters2").alias("letters2"))
如果有数千列,那么我将不得不重复上面的代码数千次。有没有更简单的解决方案,这样我就不必为每一列重复 f.concat_ws()?
我到处找了找也没找到解决方法
是的,您可以在 agg 函数中使用 for 循环并遍历 df.columns。如果有帮助,请告诉我。
from pyspark.sql import functions as F
df.show()
# +--------+--------+----+
# |letters1|letters2|name|
# +--------+--------+----+
# | a| p|Jack|
# | b| q|Jack|
# | c| r|Bell|
# | d| s|Bell|
# +--------+--------+----+
df.groupBy("name").agg( *[F.array_join(F.collect_list(column), "").alias(column) for column in df.columns if column !='name' ]).show()
# +----+--------+--------+
# |name|letters1|letters2|
# +----+--------+--------+
# |Bell| cd| rs|
# |Jack| ab| pq|
# +----+--------+--------+
我有一个包含多列的 pyspark 数据框。例如下图。
from pyspark.sql import Row
l = [('Jack',"a","p"),('Jack',"b","q"),('Bell',"c","r"),('Bell',"d","s")]
rdd = sc.parallelize(l)
score_rdd = rdd.map(lambda x: Row(name=x[0], letters1=x[1], letters2=x[2]))
score_card = sqlContext.createDataFrame(score_rdd)
+----+--------+--------+
|name|letters1|letters2|
+----+--------+--------+
|Jack| a| p|
|Jack| b| q|
|Bell| c| r|
|Bell| d| s|
+----+--------+--------+
现在我想按“名称”分组并连接两列每一行中的值。 我知道该怎么做,但假设有数千行,那么我的代码就会变得非常难看。 这是我的解决方案。
import pyspark.sql.functions as f
t = score_card.groupby("name").agg(
f.concat_ws("",collect_list("letters1").alias("letters1")),
f.concat_ws("",collect_list("letters2").alias("letters2"))
)
这是我将其保存为 CSV 文件时得到的输出。
+----+--------+--------+
|name|letters1|letters2|
+----+--------+--------+
|Jack| ab| pq|
|Bell| cd| rs|
+----+--------+--------+
但我主要关心的是这两行代码
f.concat_ws("",collect_list("letters1").alias("letters1")),
f.concat_ws("",collect_list("letters2").alias("letters2"))
如果有数千列,那么我将不得不重复上面的代码数千次。有没有更简单的解决方案,这样我就不必为每一列重复 f.concat_ws()?
我到处找了找也没找到解决方法
是的,您可以在 agg 函数中使用 for 循环并遍历 df.columns。如果有帮助,请告诉我。
from pyspark.sql import functions as F
df.show()
# +--------+--------+----+
# |letters1|letters2|name|
# +--------+--------+----+
# | a| p|Jack|
# | b| q|Jack|
# | c| r|Bell|
# | d| s|Bell|
# +--------+--------+----+
df.groupBy("name").agg( *[F.array_join(F.collect_list(column), "").alias(column) for column in df.columns if column !='name' ]).show()
# +----+--------+--------+
# |name|letters1|letters2|
# +----+--------+--------+
# |Bell| cd| rs|
# |Jack| ab| pq|
# +----+--------+--------+