使用 scala 在 Spark 中转置 DataFrame 而无需聚合
Transpose DataFrame Without Aggregation in Spark with scala
我在网上查看了多种不同的解决方案,但没有找到我想要实现的目标。
请帮我解决这个问题。
我正在使用 Apache Spark 2.1.0 和 Scala。下面是我的数据框:
+-----------+-------+
|COLUMN_NAME| VALUE |
+-----------+-------+
|col1 | val1 |
|col2 | val2 |
|col3 | val3 |
|col4 | val4 |
|col5 | val5 |
+-----------+-------+
我想将其转置为,如下所示:
+-----+-------+-----+------+-----+
|col1 | col2 |col3 | col4 |col5 |
+-----+-------+-----+------+-----+
|val1 | val2 |val3 | val4 |val5 |
+-----+-------+-----+------+-----+
您可以使用 pivot
执行此操作,但您仍然需要聚合,但是如果您有多个 value
用于 COLUMN_NAME
怎么办?
val df = Seq(
("col1", "val1"),
("col2", "val2"),
("col3", "val3"),
("col4", "val4"),
("col5", "val5")
).toDF("COLUMN_NAME", "VALUE")
df
.groupBy()
.pivot("COLUMN_NAME").agg(first("VALUE"))
.show()
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+
编辑:
如果您的数据框真的像您的示例中那么小,您可以将其收集为 Map
:
val map = df.as[(String,String)].collect().toMap
然后申请
如果您的数据框足够小,如问题,那么您可以收集COLUMN_NAME以形成架构和收集 VALUE 以形成行,然后创建一个新数据框 as
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
//creating schema from existing dataframe
val schema = StructType(df.select(collect_list("COLUMN_NAME")).first().getAs[Seq[String]](0).map(x => StructField(x, StringType)))
//creating RDD[Row]
val values = sc.parallelize(Seq(Row.fromSeq(df.select(collect_list("VALUE")).first().getAs[Seq[String]](0))))
//new dataframe creation
sqlContext.createDataFrame(values, schema).show(false)
哪个应该给你
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+
另一种使用交叉表的解决方案虽然冗长。
val dfp = spark.sql(""" with t1 (
select 'col1' c1, 'val1' c2 union all
select 'col2' c1, 'val2' c2 union all
select 'col3' c1, 'val3' c2 union all
select 'col4' c1, 'val4' c2 union all
select 'col5' c1, 'val5' c2
) select c1 COLUMN_NAME, c2 VALUE from t1
""")
dfp.show(50,false)
+-----------+-----+
|COLUMN_NAME|VALUE|
+-----------+-----+
|col1 |val1 |
|col2 |val2 |
|col3 |val3 |
|col4 |val4 |
|col5 |val5 |
+-----------+-----+
val dfp2=dfp.groupBy("column_name").agg( first($"value") as "value" ).stat.crosstab("value", "column_name")
dfp2.show(false)
+-----------------+----+----+----+----+----+
|value_column_name|col1|col2|col3|col4|col5|
+-----------------+----+----+----+----+----+
|val1 |1 |0 |0 |0 |0 |
|val3 |0 |0 |1 |0 |0 |
|val2 |0 |1 |0 |0 |0 |
|val5 |0 |0 |0 |0 |1 |
|val4 |0 |0 |0 |1 |0 |
+-----------------+----+----+----+----+----+
val needed_cols = dfp2.columns.drop(1)
needed_cols: Array[String] = Array(col1, col2, col3, col4, col5)
val dfp3 = needed_cols.foldLeft(dfp2) { (acc,x) => acc.withColumn(x,expr(s"case when ${x}=1 then value_column_name else 0 end")) }
dfp3.show(false)
+-----------------+----+----+----+----+----+
|value_column_name|col1|col2|col3|col4|col5|
+-----------------+----+----+----+----+----+
|val1 |val1|0 |0 |0 |0 |
|val3 |0 |0 |val3|0 |0 |
|val2 |0 |val2|0 |0 |0 |
|val5 |0 |0 |0 |0 |val5|
|val4 |0 |0 |0 |val4|0 |
+-----------------+----+----+----+----+----+
dfp3.select( needed_cols.map( c => max(col(c)).as(c)) :_* ).show
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+
为了增强 Ramesh Maharjan 的答案,请收集并将其转换为地图。
val mp = df.as[(String,String)].collect.toMap
使用虚拟数据框,我们可以使用 foldLeft
进一步构建
val f = Seq("1").toDF("dummy")
mp.keys.toList.sorted.foldLeft(f) { (acc,x) => acc.withColumn(mp(x),lit(x) ) }.drop("dummy").show(false)
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
我在网上查看了多种不同的解决方案,但没有找到我想要实现的目标。 请帮我解决这个问题。
我正在使用 Apache Spark 2.1.0 和 Scala。下面是我的数据框:
+-----------+-------+
|COLUMN_NAME| VALUE |
+-----------+-------+
|col1 | val1 |
|col2 | val2 |
|col3 | val3 |
|col4 | val4 |
|col5 | val5 |
+-----------+-------+
我想将其转置为,如下所示:
+-----+-------+-----+------+-----+
|col1 | col2 |col3 | col4 |col5 |
+-----+-------+-----+------+-----+
|val1 | val2 |val3 | val4 |val5 |
+-----+-------+-----+------+-----+
您可以使用 pivot
执行此操作,但您仍然需要聚合,但是如果您有多个 value
用于 COLUMN_NAME
怎么办?
val df = Seq(
("col1", "val1"),
("col2", "val2"),
("col3", "val3"),
("col4", "val4"),
("col5", "val5")
).toDF("COLUMN_NAME", "VALUE")
df
.groupBy()
.pivot("COLUMN_NAME").agg(first("VALUE"))
.show()
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+
编辑:
如果您的数据框真的像您的示例中那么小,您可以将其收集为 Map
:
val map = df.as[(String,String)].collect().toMap
然后申请
如果您的数据框足够小,如问题,那么您可以收集COLUMN_NAME以形成架构和收集 VALUE 以形成行,然后创建一个新数据框 as
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
//creating schema from existing dataframe
val schema = StructType(df.select(collect_list("COLUMN_NAME")).first().getAs[Seq[String]](0).map(x => StructField(x, StringType)))
//creating RDD[Row]
val values = sc.parallelize(Seq(Row.fromSeq(df.select(collect_list("VALUE")).first().getAs[Seq[String]](0))))
//new dataframe creation
sqlContext.createDataFrame(values, schema).show(false)
哪个应该给你
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+
另一种使用交叉表的解决方案虽然冗长。
val dfp = spark.sql(""" with t1 (
select 'col1' c1, 'val1' c2 union all
select 'col2' c1, 'val2' c2 union all
select 'col3' c1, 'val3' c2 union all
select 'col4' c1, 'val4' c2 union all
select 'col5' c1, 'val5' c2
) select c1 COLUMN_NAME, c2 VALUE from t1
""")
dfp.show(50,false)
+-----------+-----+
|COLUMN_NAME|VALUE|
+-----------+-----+
|col1 |val1 |
|col2 |val2 |
|col3 |val3 |
|col4 |val4 |
|col5 |val5 |
+-----------+-----+
val dfp2=dfp.groupBy("column_name").agg( first($"value") as "value" ).stat.crosstab("value", "column_name")
dfp2.show(false)
+-----------------+----+----+----+----+----+
|value_column_name|col1|col2|col3|col4|col5|
+-----------------+----+----+----+----+----+
|val1 |1 |0 |0 |0 |0 |
|val3 |0 |0 |1 |0 |0 |
|val2 |0 |1 |0 |0 |0 |
|val5 |0 |0 |0 |0 |1 |
|val4 |0 |0 |0 |1 |0 |
+-----------------+----+----+----+----+----+
val needed_cols = dfp2.columns.drop(1)
needed_cols: Array[String] = Array(col1, col2, col3, col4, col5)
val dfp3 = needed_cols.foldLeft(dfp2) { (acc,x) => acc.withColumn(x,expr(s"case when ${x}=1 then value_column_name else 0 end")) }
dfp3.show(false)
+-----------------+----+----+----+----+----+
|value_column_name|col1|col2|col3|col4|col5|
+-----------------+----+----+----+----+----+
|val1 |val1|0 |0 |0 |0 |
|val3 |0 |0 |val3|0 |0 |
|val2 |0 |val2|0 |0 |0 |
|val5 |0 |0 |0 |0 |val5|
|val4 |0 |0 |0 |val4|0 |
+-----------------+----+----+----+----+----+
dfp3.select( needed_cols.map( c => max(col(c)).as(c)) :_* ).show
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+
为了增强 Ramesh Maharjan 的答案,请收集并将其转换为地图。
val mp = df.as[(String,String)].collect.toMap
使用虚拟数据框,我们可以使用 foldLeft
进一步构建val f = Seq("1").toDF("dummy")
mp.keys.toList.sorted.foldLeft(f) { (acc,x) => acc.withColumn(mp(x),lit(x) ) }.drop("dummy").show(false)
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+