使用 Spark MLlib KMeans 获取 Cluster_ID 和 table 的其余部分

Get Cluster_ID and the rest of table using Spark MLlib KMeans

我有这个数据集(我放了几行):

11.97,1355,401
3.49,25579,12908
9.29,129186,10882
28.73,10153,22356
3.69,22872,9798
13.49,160371,2911
24.36,106764,867
3.99,163670,16397
19.64,132547,401

我正在尝试使用 K-Means 将所有这些行分配给 4 个集群。为此,我使用了我在 post 中看到的代码:

val data = sc.textFile("/user/cloudera/TESTE1")
val idPointRDD = data.map(s => (s(0), Vectors.dense(s(1).toInt,s(2).toInt))).cache()
val clusters = KMeans.train(idPointRDD.map(_._2), 4, 20)
val clustersRDD = clusters.predict(idPointRDD.map(_._2))
val idClusterRDD = idPointRDD.map(_._1).zip(clustersRDD)
val idCluster = idClusterRDD.toDF("purchase","id","product","cluster")

我得到这个输出:

scala> import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}

scala> import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vectors

scala> val data = sc.textFile("/user/cloudera/TESTE")
data: org.apache.spark.rdd.RDD[String] = /user/cloudera/TESTE MapPartitionsRDD[7] at textFile at <console>:29

scala> val idPointRDD = data.map(s => (s(0), Vectors.dense(s(1).toInt,s(2).toInt))).cache()
idPointRDD: org.apache.spark.rdd.RDD[(Char, org.apache.spark.mllib.linalg.Vector)] = MapPartitionsRDD[8] at map at <console>:31

但是当我 运行 它时,我收到以下错误:

java.lang.UnsupportedOperationException: Schema for type Char is not supported
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:715)

我该如何解决这个问题?

非常感谢!

事情是这样的。您实际上是将 CSV 值读取到 String 的 RDD 中,而不是将其正确转换为数值。相反,当您在每个示例中调用 s(0) 时,字符串是字符的集合,这实际上可以将 Char 值转换为整数或双精度值,但这不是您实际要查找的内容。

您需要拆分 val data : RDD[String]

 val data : RDD[String] = ??? 
 val idPointRDD = data.map {
     s => 
      s.split(",") match { 
      case Array(x,y,z) => Vectors.dense(x.toDouble, Integer.parseInt(y).toDouble,Integer.parseInt(z).toDouble)
      }
 }.cache()

这应该适合你!