使用 scala 在 Spark 中转置 DataFrame 而无需聚合

Transpose DataFrame Without Aggregation in Spark with scala

我在网上查看了多种不同的解决方案,但没有找到我想要实现的目标。 请帮我解决这个问题。

我正在使用 Apache Spark 2.1.0 和 Scala。下面是我的数据框:


+-----------+-------+
|COLUMN_NAME| VALUE |
+-----------+-------+
|col1       | val1  |
|col2       | val2  |
|col3       | val3  |
|col4       | val4  |
|col5       | val5  |
+-----------+-------+

我想将其转置为,如下所示:


+-----+-------+-----+------+-----+
|col1 | col2  |col3 | col4 |col5 |
+-----+-------+-----+------+-----+
|val1 | val2  |val3 | val4 |val5 |
+-----+-------+-----+------+-----+

您可以使用 pivot 执行此操作,但您仍然需要聚合,但是如果您有多个 value 用于 COLUMN_NAME 怎么办?

val df = Seq(
  ("col1", "val1"),
  ("col2", "val2"),
  ("col3", "val3"),
  ("col4", "val4"),
  ("col5", "val5")
).toDF("COLUMN_NAME", "VALUE")

df
  .groupBy()
  .pivot("COLUMN_NAME").agg(first("VALUE"))
  .show()

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+

编辑:

如果您的数据框真的像您的示例中那么小,您可以将其收集为 Map:

val map = df.as[(String,String)].collect().toMap

然后申请

如果您的数据框足够小,如问题,那么您可以收集COLUMN_NAME以形成架构收集 VALUE 以形成行,然后创建一个新数据框 as

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
//creating schema from existing dataframe
val schema = StructType(df.select(collect_list("COLUMN_NAME")).first().getAs[Seq[String]](0).map(x => StructField(x, StringType)))
//creating RDD[Row] 
val values = sc.parallelize(Seq(Row.fromSeq(df.select(collect_list("VALUE")).first().getAs[Seq[String]](0))))
//new dataframe creation
sqlContext.createDataFrame(values, schema).show(false)

哪个应该给你

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+

另一种使用交叉表的解决方案虽然冗长。

 val dfp = spark.sql(""" with t1 (
 select  'col1' c1, 'val1' c2  union all
 select  'col2' c1, 'val2' c2  union all
 select  'col3' c1, 'val3' c2  union all
 select  'col4' c1, 'val4' c2  union all
 select  'col5' c1, 'val5' c2
  )  select   c1  COLUMN_NAME,   c2  VALUE     from t1
""")
dfp.show(50,false)

+-----------+-----+
|COLUMN_NAME|VALUE|
+-----------+-----+
|col1       |val1 |
|col2       |val2 |
|col3       |val3 |
|col4       |val4 |
|col5       |val5 |
+-----------+-----+

val dfp2=dfp.groupBy("column_name").agg( first($"value") as "value" ).stat.crosstab("value", "column_name")
dfp2.show(false)

+-----------------+----+----+----+----+----+
|value_column_name|col1|col2|col3|col4|col5|
+-----------------+----+----+----+----+----+
|val1             |1   |0   |0   |0   |0   |
|val3             |0   |0   |1   |0   |0   |
|val2             |0   |1   |0   |0   |0   |
|val5             |0   |0   |0   |0   |1   |
|val4             |0   |0   |0   |1   |0   |
+-----------------+----+----+----+----+----+

val needed_cols = dfp2.columns.drop(1)

needed_cols: Array[String] = Array(col1, col2, col3, col4, col5)

val dfp3 = needed_cols.foldLeft(dfp2) { (acc,x) => acc.withColumn(x,expr(s"case when ${x}=1 then value_column_name else 0 end")) }
dfp3.show(false)

+-----------------+----+----+----+----+----+
|value_column_name|col1|col2|col3|col4|col5|
+-----------------+----+----+----+----+----+
|val1             |val1|0   |0   |0   |0   |
|val3             |0   |0   |val3|0   |0   |
|val2             |0   |val2|0   |0   |0   |
|val5             |0   |0   |0   |0   |val5|
|val4             |0   |0   |0   |val4|0   |
+-----------------+----+----+----+----+----+

dfp3.select( needed_cols.map( c => max(col(c)).as(c)) :_* ).show

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+

为了增强 Ramesh Maharjan 的答案,请收集并将其转换为地图。

val mp = df.as[(String,String)].collect.toMap

使用虚拟数据框,我们可以使用 foldLeft

进一步构建
val f = Seq("1").toDF("dummy")

mp.keys.toList.sorted.foldLeft(f) { (acc,x) => acc.withColumn(mp(x),lit(x) ) }.drop("dummy").show(false)

+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+