如何使用 scala 在 databricks apache 中旋转列和行?
how to pivot columns and rows in databricks apache using scala?
我正在尝试将行转为列。我正在使用数据透视函数,但是当我使用它时,它只会给我完全相同的数据库,没有任何更改。代码运行良好,没有任何错误,但我想重新格式化数据并添加列属性和值,如下所示。非常感谢任何帮助!
// current database table
Census_block_group B08007e1 B08007m1 B08007e2 B08007m2
010010201001 291 95 291 95
010010201002 678 143 663 139
// 我需要什么
Census_block_group attribute value
010010201001 B08007e1 678
//代码
import org.apache.spark.sql.SQLContext
spark.conf.set("spark.sql.pivotMaxValues", 999999)
val df = censusBlocks.toDF
df.groupBy("B08007e1").pivot("census_block_group")
display(df)
您实际尝试做的实际上是 "unpivot" 而不是支点。 Spark 没有 unpivot 函数。相反,您可以使用堆栈功能。
import org.apache.spark.sql.functions._
val unPivotDF = df.select($"Census_block_group",
expr("stack(4, 'B08007e1', B08007e1, 'B08007m1', B08007m1, 'B08007e2', B08007e2, 'B08007m2', B08007e2) as (attribute,value)"))
unPivotDF.show()
您可以在此处找到有关使用堆栈功能的更多详细信息 - https://sparkbyexamples.com/how-to-pivot-table-and-unpivot-a-spark-dataframe/
你实际上想要 'Transpose' 而不是 'Pivot'。这是另一个解决方案(抱歉有点冗长):-)
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
object Whosebug3 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("Test").master("local").getOrCreate()
val df = <YOUR ORIGINAL DATAFRAME>
val transposed = transform(df, Array("Census_block_group"))
transposed
.withColumn("Attribute", col("ColVal.col1"))
.withColumn("Value", col("ColVal.col2"))
.drop("ColVal")
.show()
}
def transform(df: DataFrame, fixedColumns: Array[String]): DataFrame = {
val colsToTranspose = df.columns.diff(fixedColumns)
val createCols = {
colsToTranspose.foldLeft(Array.empty[Column]) {
case (acc, name) => acc.:+(struct(lit(name).cast(StringType), col(name).cast(StringType)))
}
}
df
.withColumn("colVal", explode(array(createCols: _*)))
.select(Array("Census_block_group", "colVal").map(col): _*)
}
}
我正在尝试将行转为列。我正在使用数据透视函数,但是当我使用它时,它只会给我完全相同的数据库,没有任何更改。代码运行良好,没有任何错误,但我想重新格式化数据并添加列属性和值,如下所示。非常感谢任何帮助!
// current database table
Census_block_group B08007e1 B08007m1 B08007e2 B08007m2
010010201001 291 95 291 95
010010201002 678 143 663 139
// 我需要什么
Census_block_group attribute value
010010201001 B08007e1 678
//代码
import org.apache.spark.sql.SQLContext
spark.conf.set("spark.sql.pivotMaxValues", 999999)
val df = censusBlocks.toDF
df.groupBy("B08007e1").pivot("census_block_group")
display(df)
您实际尝试做的实际上是 "unpivot" 而不是支点。 Spark 没有 unpivot 函数。相反,您可以使用堆栈功能。
import org.apache.spark.sql.functions._
val unPivotDF = df.select($"Census_block_group",
expr("stack(4, 'B08007e1', B08007e1, 'B08007m1', B08007m1, 'B08007e2', B08007e2, 'B08007m2', B08007e2) as (attribute,value)"))
unPivotDF.show()
您可以在此处找到有关使用堆栈功能的更多详细信息 - https://sparkbyexamples.com/how-to-pivot-table-and-unpivot-a-spark-dataframe/
你实际上想要 'Transpose' 而不是 'Pivot'。这是另一个解决方案(抱歉有点冗长):-)
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
object Whosebug3 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("Test").master("local").getOrCreate()
val df = <YOUR ORIGINAL DATAFRAME>
val transposed = transform(df, Array("Census_block_group"))
transposed
.withColumn("Attribute", col("ColVal.col1"))
.withColumn("Value", col("ColVal.col2"))
.drop("ColVal")
.show()
}
def transform(df: DataFrame, fixedColumns: Array[String]): DataFrame = {
val colsToTranspose = df.columns.diff(fixedColumns)
val createCols = {
colsToTranspose.foldLeft(Array.empty[Column]) {
case (acc, name) => acc.:+(struct(lit(name).cast(StringType), col(name).cast(StringType)))
}
}
df
.withColumn("colVal", explode(array(createCols: _*)))
.select(Array("Census_block_group", "colVal").map(col): _*)
}
}