按 Spark Dataframe 上的值分组
Grouping by values on a Spark Dataframe
我正在处理包含此类数据的 Spark 数据框:
A,1,2,3
B,1,2,3
C,1,2,3
D,4,2,3
我想在最后三列上汇总此数据,因此输出将是:
ABC,1,2,3
D,4,2,3
我怎样才能在 scala 中做到这一点? (这不是一个大数据框,所以性能是次要的)
如评论中所述,您可以先使用 groupBy
对列进行分组,然后在第一列上使用 concat_ws
。这是一种方法,
//create you original DF
val df = Seq(("A",1,2,3),("B",1,2,3),("C",1,2,3),("D",4,2,3)).toDF("col1","col2","col3","col4")
df.show
//output
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| A| 1| 2| 3|
| B| 1| 2| 3|
| C| 1| 2| 3|
| D| 4| 2| 3|
+----+----+----+----+
//group by "col2","col3","col4" and store "col1" as list and then
//convert it to string
df.groupBy("col2","col3","col4")
.agg(collect_list("col1").as("col1"))
//you can change the string separator by concat_ws first arg
.select(concat_ws("", $"col1") as "col1",$"col2",$"col3",$"col4").show
//output
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| D| 4| 2| 3|
| ABC| 1| 2| 3|
+----+----+----+----+
或者,在本例中,您可以通过键映射 c2、c3、c4,然后通过键归约连接您的值。最后,我通过最后一张地图根据需要格式化每一行。它应该类似于以下内容:
val data=sc.parallelize(List(
("A", "1", "2", "3"),
("B", "1", "2", "3"),
("C", "1", "2", "3"),
("D", "4", "2", "3")))
val res = data.map{ case (c1, c2, c3, c4) => ((c2, c3, c4), String.valueOf(c1)) }
.reduceByKey((x, y) => x + y)
.map(v => v._2.toString + "," + v._1.productIterator.toArray.mkString(","))
.collect
我正在处理包含此类数据的 Spark 数据框:
A,1,2,3
B,1,2,3
C,1,2,3
D,4,2,3
我想在最后三列上汇总此数据,因此输出将是:
ABC,1,2,3
D,4,2,3
我怎样才能在 scala 中做到这一点? (这不是一个大数据框,所以性能是次要的)
如评论中所述,您可以先使用 groupBy
对列进行分组,然后在第一列上使用 concat_ws
。这是一种方法,
//create you original DF
val df = Seq(("A",1,2,3),("B",1,2,3),("C",1,2,3),("D",4,2,3)).toDF("col1","col2","col3","col4")
df.show
//output
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| A| 1| 2| 3|
| B| 1| 2| 3|
| C| 1| 2| 3|
| D| 4| 2| 3|
+----+----+----+----+
//group by "col2","col3","col4" and store "col1" as list and then
//convert it to string
df.groupBy("col2","col3","col4")
.agg(collect_list("col1").as("col1"))
//you can change the string separator by concat_ws first arg
.select(concat_ws("", $"col1") as "col1",$"col2",$"col3",$"col4").show
//output
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| D| 4| 2| 3|
| ABC| 1| 2| 3|
+----+----+----+----+
或者,在本例中,您可以通过键映射 c2、c3、c4,然后通过键归约连接您的值。最后,我通过最后一张地图根据需要格式化每一行。它应该类似于以下内容:
val data=sc.parallelize(List(
("A", "1", "2", "3"),
("B", "1", "2", "3"),
("C", "1", "2", "3"),
("D", "4", "2", "3")))
val res = data.map{ case (c1, c2, c3, c4) => ((c2, c3, c4), String.valueOf(c1)) }
.reduceByKey((x, y) => x + y)
.map(v => v._2.toString + "," + v._1.productIterator.toArray.mkString(","))
.collect