在 Scala 中将数据帧转换为 Spark mllib 矩阵
Convert dataframe into Spark mllib matrix in Scala
我有一个名为 df
的 Spark 数据框作为输入:
+---------------+---+---+---+---+
|Main_CustomerID| A1| A2| A3| A4|
+---------------+---+---+---+---+
| 101| 1| 0| 2| 1|
| 102| 0| 3| 1| 1|
| 103| 2| 1| 0| 0|
+---------------+---+---+---+---+
我需要将 A1
、A2
、A3
、A4
的值收集到 mllib 矩阵中,例如,
dm: org.apache.spark.mllib.linalg.Matrix =
1.0 0.0 2.0 1.0
0.0 3.0 1.0 1.0
2.0 1.0 0.0 0.0
我如何在 Scala 中实现这一点?
可以按如下方式进行,首先获取矩阵中应该包含的所有列:
import org.apache.spark.sql.functions._
val matrixColumns = df.columns.filter(_.startsWith("A")).map(col(_))
然后将数据帧转换为 RDD[Vector]
。由于向量需要包含双精度数,因此也需要在此处进行此转换。
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}
val rdd = df.select(array(matrixColumns:_*).as("arr")).as[Array[Int]].rdd
.zipWithIndex()
.map{ case(arr, index) => IndexedRow(index, Vectors.dense(arr.map(_.toDouble)))}
然后将 rdd 转换为 IndexedRowMatrix
,如果需要,可以将其转换为本地矩阵:
val dm = new IndexedRowMatrix(rdd).toBlockMatrix().toLocalMatrix()
对于可以收集到驱动程序的较小的矩阵,有一个更简单的选择:
val matrixColumns = df.columns.filter(_.startsWith("A")).map(col(_))
val arr = df.select(array(matrixColumns:_*).as("arr")).as[Array[Int]]
.collect()
.flatten
.map(_.toDouble)
val rows = df.count().toInt
val cols = matrixColumns.length
// It's necessary to reverse cols and rows here and then transpose
val dm = Matrices.dense(cols, rows, arr).transpose()
我有一个名为 df
的 Spark 数据框作为输入:
+---------------+---+---+---+---+
|Main_CustomerID| A1| A2| A3| A4|
+---------------+---+---+---+---+
| 101| 1| 0| 2| 1|
| 102| 0| 3| 1| 1|
| 103| 2| 1| 0| 0|
+---------------+---+---+---+---+
我需要将 A1
、A2
、A3
、A4
的值收集到 mllib 矩阵中,例如,
dm: org.apache.spark.mllib.linalg.Matrix =
1.0 0.0 2.0 1.0
0.0 3.0 1.0 1.0
2.0 1.0 0.0 0.0
我如何在 Scala 中实现这一点?
可以按如下方式进行,首先获取矩阵中应该包含的所有列:
import org.apache.spark.sql.functions._
val matrixColumns = df.columns.filter(_.startsWith("A")).map(col(_))
然后将数据帧转换为 RDD[Vector]
。由于向量需要包含双精度数,因此也需要在此处进行此转换。
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}
val rdd = df.select(array(matrixColumns:_*).as("arr")).as[Array[Int]].rdd
.zipWithIndex()
.map{ case(arr, index) => IndexedRow(index, Vectors.dense(arr.map(_.toDouble)))}
然后将 rdd 转换为 IndexedRowMatrix
,如果需要,可以将其转换为本地矩阵:
val dm = new IndexedRowMatrix(rdd).toBlockMatrix().toLocalMatrix()
对于可以收集到驱动程序的较小的矩阵,有一个更简单的选择:
val matrixColumns = df.columns.filter(_.startsWith("A")).map(col(_))
val arr = df.select(array(matrixColumns:_*).as("arr")).as[Array[Int]]
.collect()
.flatten
.map(_.toDouble)
val rows = df.count().toInt
val cols = matrixColumns.length
// It's necessary to reverse cols and rows here and then transpose
val dm = Matrices.dense(cols, rows, arr).transpose()