计算 Kmeans 的成本

Compute Cost of Kmeans

我用的是这个model,不是我写的。为了预测质心,我必须这样做:

model = cPickle.load(open("/tmp/model_centroids_128d_pkl.lopq"))
codes = d.map(lambda x: (x[0], model.predict_coarse(x[1])))

其中 `d.first()' 产生这个:

(u'3768915289',
 array([ -86.00641097, -100.41325623,   <128 coords in total>]))

codes.first():

(u'3768915289', (5657, 7810))

我如何 computeCost() 这个 KMeans 模型?


阅读train_model.py后,我正在尝试这样:

In [23]: from pyspark.mllib.clustering import KMeans, KMeansModel
In [24]: Cs = model.Cs     # centroids
In [25]: model = KMeansModel(Cs[0]) # I am very positive this line is good
In [26]: costs = d.map(lambda x: model.computeCost(x[1]))
In [27]: costs.first()

但是我得到这个错误:

AttributeError: 'numpy.ndarray' object has no attribute 'map'

这意味着 Spark 尝试在幕后使用 map() 来实现 x[1]...


这意味着它需要一个 RDD!!!但是是哪种形式呢?

我正在尝试:

d = d.map(lambda x: x[1])
d.first()
array([  7.17036494e+01,   1.07987890e+01, ...])
costs = model.computeCost(d)

我没有收到错误:

16/08/30 00:39:21 WARN TaskSetManager: Lost task 821.0 in stage 40.0 : java.lang.IllegalArgumentException: requirement failed
    at scala.Predef$.require(Predef.scala:221)
    at org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:330)
    at org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:595)
    at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest.apply(KMeans.scala:569)
    at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest.apply(KMeans.scala:563)
    at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73)
    at org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:563)
    at org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:586)
    at org.apache.spark.mllib.clustering.KMeansModel$$anonfun$computeCost.apply(KMeansModel.scala:88)
    at org.apache.spark.mllib.clustering.KMeansModel$$anonfun$computeCost.apply(KMeansModel.scala:88)
    at scala.collection.Iterator$$anon.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.fold(TraversableOnce.scala:199)
    at scala.collection.AbstractIterator.fold(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$fold$$anonfun.apply(RDD.scala:1086)
    at org.apache.spark.rdd.RDD$$anonfun$fold$$anonfun.apply(RDD.scala:1086)
    at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1951)
    at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1951)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-44-6223595c8b5f> in <module>()
----> 1 costs = model.computeCost(d)

/home/gs/spark/current/python/pyspark/mllib/clustering.py in computeCost(self, rdd)
    140         """
    141         cost = callMLlibFunc("computeCostKmeansModel", rdd.map(_convert_to_vector),
--> 142                              [_convert_to_vector(c) for c in self.centers])
    143         return cost
    144 

/home/gs/spark/current/python/pyspark/mllib/common.py in callMLlibFunc(name, *args)
    128     sc = SparkContext.getOrCreate()
    129     api = getattr(sc._jvm.PythonMLLibAPI(), name)
--> 130     return callJavaFunc(sc, api, *args)
    131 
    132 

/home/gs/spark/current/python/pyspark/mllib/common.py in callJavaFunc(sc, func, *args)
    121     """ Call Java Function """
    122     args = [_py2java(sc, a) for a in args]
--> 123     return _java2py(sc, func(*args))
    124 
    125 

/home/gs/spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
    811         answer = self.gateway_client.send_command(command)
    812         return_value = get_return_value(
--> 813             answer, self.gateway_client, self.target_id, self.name)
    814 
    815         for temp_arg in temp_args:

/home/gs/spark/current/python/pyspark/sql/utils.py in deco(*a, **kw)
     43     def deco(*a, **kw):
     44         try:
---> 45             return f(*a, **kw)
     46         except py4j.protocol.Py4JJavaError as e:
     47             s = e.java_exception.toString()

/home/gs/spark/current/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    306                 raise Py4JJavaError(
    307                     "An error occurred while calling {0}{1}{2}.\n".
--> 308                     format(target_id, ".", name), value)
    309             else:
    310                 raise Py4JError(

Py4JJavaError: An error occurred while calling o25177.computeCostKmeansModel.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 821 in stage 40.0 failed 4 times, most recent failure: Lost task 821.3 in stage 40.0: java.lang.IllegalArgumentException: requirement failed
    at scala.Predef$.require(Predef.scala:221)
    at org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:330)
    at org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:595)
    at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest.apply(KMeans.scala:569)
    at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest.apply(KMeans.scala:563)
    at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73)
    at org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:563)
    at org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:586)
    at org.apache.spark.mllib.clustering.KMeansModel$$anonfun$computeCost.apply(KMeansModel.scala:88)
    at org.apache.spark.mllib.clustering.KMeansModel$$anonfun$computeCost.apply(KMeansModel.scala:88)
    at scala.collection.Iterator$$anon.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.fold(TraversableOnce.scala:199)
    at scala.collection.AbstractIterator.fold(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$fold$$anonfun.apply(RDD.scala:1086)
    at org.apache.spark.rdd.RDD$$anonfun$fold$$anonfun.apply(RDD.scala:1086)
    at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1951)
    at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1951)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

编辑:

split_vecs = d.map(lambda x: np.split(x[1], 2))

似乎是一个很好的步骤,因为质心有 64 个维度。

model.computeCost((d.map(lambda x: x[1])).first())

出现此错误:AttributeError: 'numpy.ndarray' object has no attribute 'map'

显然从 documentation 我读过,你必须:

  1. 可以通过reading a previously saved 模型创建模型,或者拟合新模型。

  2. 获得model you can use its method computeCost后,需要格式化RDD才能输出有用的东西。

因此,如果我假设您的变量 model 是一个 KMeansModel 并且存储在变量 d 中的数据具有预期的表示形式,那么您应该能够 运行 下面的代码:

model.computeCost(d)

编辑:

您应该创建一个 RDD,其中将包含与质心具有相同维度的向量,并将其作为输入参数提供给 computeCost(),例如:

split_vecs = d.map(lambda x: (x[0], np.split(x[1], 2)))
costs_per_split = [KMeansModel(model.Cs[i]).computeCost(split_vecs.map(lambda x: x[1][i])) for i in range(2)]