转置 RDD[Vector] 以将记录更改为大小为 500.000 x 50 的 csv 的属性

Transpose RDD[Vector] to change records to attributes for an csv of size 500.000 x 50

我想读取一个 csv 文件并将其转置以测量属性之间的相关性。但是当我转置它时,我得到以下错误:

not enough arguments for method transpose: (implicit asTraversable: org.apache.spark.mllib.linalg.Vector => scala.collection.GenTraversableOnce[B])Seq[Seq[B]]. Unspecified value parameter asTraversable.

涉及默认参数的应用程序出错。

val file = "/data.csv"   
val data = sc.textFile(file).map(line => Vectors.dense(line.split (",").map(_.toDouble).distinct))
val transposedData = sc.parallelize(data.collect.toSeq.transpose)
val correlMatrix: Matrix = Statistics.corr(transposedData, "pearson")
println(correlMatrix.toString)

not enough arguments for method transpose: (implicit asTraversable: org.apache.spark.mllib.linalg.Vector => scala.collection.GenTraversableOnce[B])Seq[Seq[B]]. Unspecified value parameter asTraversable.

data RDD 是 org.apache.spark.mllib.linalg.Vector 的集合,即对象集合。但是 transpose 需要收集 collection.

data.collect.toSeq 只是给你 Seq[Vector] 不能转置。

以下代码应该适合您

val data = sc.textFile(file).map(line => line.split (",").map(_.toDouble))
val untransposedData = data.map(Vectors.dense(_))
val transposedData = sc.parallelize(data.collect.toSeq.transpose).map(x => Vectors.dense(x.toArray))
val correlMatrix: Matrix = Statistics.corr(transposedData, "pearson")
println(correlMatrix.toString)

注意: distinct被移除,因为它会使二维矩阵不均匀,这会导致另一个问题。