将 RDD 转换为具有固定长度文件数据的向量

converting RDD to vector with fixed length file data

我是 Spark + Scala 的新手,仍在培养我的直觉。我有一个包含许多数据样本的文件。每 2048 行代表一个新样本。我正在尝试将每个样本转换为向量,然后通过 k 均值聚类算法 运行。数据文件如下所示:

123.34  800.18
456.123 23.16
...

当我处理非常小的数据子集时,我从这样的文件创建一个 RDD:

val fileData = sc.textFile("hdfs://path/to/file.txt")

然后使用以下代码创建矢量:

val freqLineCount = 2048
val numSamples    = 200
val freqPowers = fileData.map( _.split(" ")(1).toDouble )
val allFreqs    = freqPowers.take(numSamples*freqLineCount).grouped(freqLineCount)
val lotsOfVecs  = allFreqs.map(spec => Vectors.dense(spec) ).toArray
val lotsOfVecsRDD = sc.parallelize( lotsOfVecs ).cache()

val numClusters = 2
val numIterations = 2
val clusters = KMeans.train(lotsOfVecsRDD, numClusters, numIterations)

这里的关键是我可以在一个字符串数组上调用 .grouped,它 returns 一个包含连续 2048 个值的数组。然后通过 KMeans 训练算法将其转换为向量并运行它是微不足道的。

我正在尝试 运行 在更大的数据集上运行此代码,运行 出现 java.lang.OutOfMemoryError: Java heap space 错误。大概是因为我在我的 freqPowers 变量上调用 take 方法,然后对该数据执行一些操作。

我将如何在这个数据集上实现 运行ning KMeans 的目标,同时牢记

  1. 每个数据样本在文件中每 2048 行出现一次(因此文件应该按顺序解析)

  2. 此代码需要 运行 在分布式集群上

  3. 我不需要 运行 内存不足:)

提前致谢

你可以这样做:

val freqLineCount = 2048
val freqPowers = fileData.flatMap(_.split(" ")(1).toDouble)

// Replacement of your current code.
val groupedRDD = freqPowers.zipWithIndex().groupBy(_._2 / freqLineCount)
val vectorRDD = groupedRDD.map(grouped => Vectors.dense(grouped._2.map(_._1).toArray))

val numClusters = 2
val numIterations = 2
val clusters = KMeans.train(vectorRDD, numClusters, numIterations)

替换代码使用 zipWithIndex() 和 long 的划分将 RDD 元素分组为 freqLineCount 个块。分组后,有问题的元素被提取到它们的实际向量中。