Spark Scala:在多个 RDD 之间拆分每一行

Spark Scala: Split each line between multiple RDDs

我在 HDFS 上有一个文件,格式如下:

61,139,75
63,140,77
64,129,82
68,128,56
71,140,47
73,141,38
75,128,59
64,129,61
64,129,80
64,129,99

我从中创建一个 RDD,并用它们的索引压缩元素:

val data = sc.textFile("hdfs://localhost:54310/usrp/sample.txt")
val points = data.map(s => Vectors.dense(s.split(',').map(_.toDouble)))
val indexed = points.zipWithIndex()
val indexedData = indexed.map{case (value,index) => (index,value)}

现在我需要用索引和每行的前两个元素创建 rdd1。然后需要用每行的索引和第三个元素创建 rdd2 。我是 Scala 的新手,你能帮我看看怎么做吗?

这不起作用,因为 y 不是 Vector 类型,而是 org.apache.spark.mllib.linalg.Vector

val rdd1 = indexedData.map{case (x,y) => (x,y.take(2))}

基本上如何获得这种向量的前两个元素?

谢谢。

您可以通过以下步骤实现上述输出:

原始数据:

indexedData.foreach(println)
(0,[61.0,139.0,75.0])
(1,[63.0,140.0,77.0])
(2,[64.0,129.0,82.0])
(3,[68.0,128.0,56.0])
(4,[71.0,140.0,47.0])
(5,[73.0,141.0,38.0])
(6,[75.0,128.0,59.0])
(7,[64.0,129.0,61.0])
(8,[64.0,129.0,80.0])
(9,[64.0,129.0,99.0])

RRD1 数据:

每行的前两个元素都有索引。

val rdd1 = indexedData.map{case (x,y) => (x, (y.toArray(0), y.toArray(1)))}
rdd1.foreach(println)
(0,(61.0,139.0))
(1,(63.0,140.0))
(2,(64.0,129.0))
(3,(68.0,128.0))
(4,(71.0,140.0))
(5,(73.0,141.0))
(6,(75.0,128.0))
(7,(64.0,129.0))
(8,(64.0,129.0))
(9,(64.0,129.0))

RRD2 数据:

具有索引和行的第三个元素。

val rdd2 = indexedData.map{case (x,y) => (x, y.toArray(2))}
rdd2.foreach(println)
(0,75.0)
(1,77.0)
(2,82.0)
(3,56.0)
(4,47.0)
(5,38.0)
(6,59.0)
(7,61.0)
(8,80.0)
(9,99.0)

您可以利用DenseVectorunapply方法获取模式匹配中的底层Array[Double],然后调用take/drop 在 Array 上,用 Vector 重新包装它:

val rdd1 = indexedData.map { case (i, DenseVector(arr)) => (i, Vectors.dense(arr.take(2))) }
val rdd2 = indexedData.map { case (i, DenseVector(arr)) => (i, Vectors.dense(arr.drop(2))) }

如您所见 - 这意味着您创建的原始 DenseVector 并不是那么有用,因此如果您不打算在其他任何地方使用 indexedData,则可能更好首先将 indexedData 创建为 RDD[(Long, Array[Double])]

val points = data.map(s => s.split(',').map(_.toDouble))
val indexedData: RDD[(Long, Array[Double])] = points.zipWithIndex().map(_.swap)

val rdd1 = indexedData.mapValues(arr => Vectors.dense(arr.take(2)))
val rdd2 = indexedData.mapValues(arr => Vectors.dense(arr.drop(2))) 

最后提示:您可能想在 indexedData 上调用 .cache(),然后再扫描两次以创建 rdd1rdd2 - 否则文件将被加载和解析两次。