Spark KMeans 无法处理大数据吗?

Is Spark's KMeans unable to handle bigdata?

KMeans 的 training 有多个参数,初始化模式默认为 kmeans||。问题是它快速(不到 10 分钟)前进到前 13 个阶段,但随后 完全挂起 ,没有产生错误!

重现问题的最小示例(如果我使用 1000 点或随机初始化会成功):

from pyspark.context import SparkContext

from pyspark.mllib.clustering import KMeans
from pyspark.mllib.random import RandomRDDs


if __name__ == "__main__":
    sc = SparkContext(appName='kmeansMinimalExample')

    # same with 10000 points
    data = RandomRDDs.uniformVectorRDD(sc, 10000000, 64)
    C = KMeans.train(data, 8192,  maxIterations=10)    

    sc.stop()

作业没有执行任何操作(它没有成功、失败或进展..),如下所示。 Executors 选项卡中没有 active/failed 任务。 Stdout 和 Stderr Logs 没有什么特别有趣的:

如果我用k=81代替8192,会成功:

注意takeSample()的两次调用,因为在随机初始化的情况下调用了两次。

所以,这是怎么回事? Spark 的 Kmeans 无法缩放 吗?有人知道吗?你能重现吗?


如果是内存问题,I would get warnings and errors, as I had been before

注意:placeybordeaux 的评论是基于 客户端模式 中作业的执行,其中驱动程序的配置无效,导致退出代码 143 等(请参阅编辑历史记录) , 不是在集群模式下,根本没有错误报告,应用程序只是挂起


来自 zero323: 是相关的,但我认为他见证了一些进步,而我的挂起,我确实发表了评论...

我认为 'hanging' 是因为您的遗嘱执行人不断死亡。正如我在旁白中提到的,这段代码在 Pyspark 和 Scala 中对我来说运行良好,无论是在本地还是在集群上。但是,它需要的时间比应该的要长得多。几乎所有的时间都花在了k-means上||初始化。

我打开了https://issues.apache.org/jira/browse/SPARK-17389 to track two main improvements, one of which you can use now. Edit: really, see also https://issues.apache.org/jira/browse/SPARK-11560

首先,有一些代码优化可以将初始化速度提高大约 13%。

然而,大多数问题是它默认为 5 步 k-means|| init,似乎 2 几乎总是一样好。您可以将初始化步骤设置为 2 以查看加速,尤其是在现在挂起的阶段。

在我的笔记本电脑上进行的(较小的)测试中,初始化时间从 5:54 变为 1:41,这两个变化主要是由于设置了初始化步骤。

如果您的 RDD 太大,collectAsMap 将尝试将 RDD 中的每个元素复制到单个驱动程序中,然后 运行 内存不足并崩溃。即使您对数据进行了分区,collectAsMap 也会将所有内容发送给驱动程序,您的作业就会崩溃。 您可以通过调用 take 或 takeSample,或者过滤或采样您的 RDD 来确保 return 的元素数量受到限制。 同样,除非您确定您的数据集大小足够小以适合内存,否则也要谨慎对待这些其他操作:

按键计数, 按值计算, 收集

如果您确实需要 RDD 的这些值中的每一个并且数据太大而无法放入内存,您可以将 RDD 写入文件或将 RDD 导出到一个足够大的数据库来容纳所有的数据。当您使用 API 时,我认为您无法做到这一点(也许重写所有代码?增加内存?)。我认为 运行Algorithm 方法中的这个 collectAsMap 在 Kmeans (https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/dont_call_collect_on_a_very_large_rdd.html) 中是一件非常糟糕的事情...