如何使用 scala 在 databricks apache 中旋转列和行?

how to pivot columns and rows in databricks apache using scala?

我正在尝试将行转为列。我正在使用数据透视函数,但是当我使用它时,它只会给我完全相同的数据库,没有任何更改。代码运行良好,没有任何错误,但我想重新格式化数据并添加列属性和值,如下所示。非常感谢任何帮助!

 // current database table

    Census_block_group  B08007e1    B08007m1    B08007e2    B08007m2
    010010201001          291         95         291         95
    010010201002          678        143         663         139

// 我需要什么

    Census_block_group   attribute      value
     010010201001           B08007e1      678

//代码

import org.apache.spark.sql.SQLContext

spark.conf.set("spark.sql.pivotMaxValues", 999999)

 val df = censusBlocks.toDF
df.groupBy("B08007e1").pivot("census_block_group")
display(df)

您实际尝试做的实际上是 "unpivot" 而不是支点。 Spark 没有 unpivot 函数。相反,您可以使用堆栈功能。

import org.apache.spark.sql.functions._

val unPivotDF = df.select($"Census_block_group",
expr("stack(4, 'B08007e1', B08007e1, 'B08007m1', B08007m1, 'B08007e2', B08007e2, 'B08007m2', B08007e2) as (attribute,value)"))
unPivotDF.show()

您可以在此处找到有关使用堆栈功能的更多详细信息 - https://sparkbyexamples.com/how-to-pivot-table-and-unpivot-a-spark-dataframe/

你实际上想要 'Transpose' 而不是 'Pivot'。这是另一个解决方案(抱歉有点冗长):-)

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{Column, DataFrame, SparkSession}

object Whosebug3 {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("Test").master("local").getOrCreate()

    val df = <YOUR ORIGINAL DATAFRAME>

    val transposed = transform(df, Array("Census_block_group"))

    transposed
      .withColumn("Attribute", col("ColVal.col1"))
      .withColumn("Value", col("ColVal.col2"))
      .drop("ColVal")
      .show()
  }

  def transform(df: DataFrame, fixedColumns: Array[String]): DataFrame = {

    val colsToTranspose = df.columns.diff(fixedColumns)

    val createCols = {

      colsToTranspose.foldLeft(Array.empty[Column]) {
        case (acc, name) => acc.:+(struct(lit(name).cast(StringType), col(name).cast(StringType)))
      }

    }

    df
      .withColumn("colVal", explode(array(createCols: _*)))
      .select(Array("Census_block_group", "colVal").map(col): _*)
  }

}