Spark Scala:在多个 RDD 之间拆分每一行
Spark Scala: Split each line between multiple RDDs
我在 HDFS 上有一个文件,格式如下:
61,139,75
63,140,77
64,129,82
68,128,56
71,140,47
73,141,38
75,128,59
64,129,61
64,129,80
64,129,99
我从中创建一个 RDD,并用它们的索引压缩元素:
val data = sc.textFile("hdfs://localhost:54310/usrp/sample.txt")
val points = data.map(s => Vectors.dense(s.split(',').map(_.toDouble)))
val indexed = points.zipWithIndex()
val indexedData = indexed.map{case (value,index) => (index,value)}
现在我需要用索引和每行的前两个元素创建 rdd1
。然后需要用每行的索引和第三个元素创建 rdd2
。我是 Scala 的新手,你能帮我看看怎么做吗?
这不起作用,因为 y
不是 Vector
类型,而是 org.apache.spark.mllib.linalg.Vector
val rdd1 = indexedData.map{case (x,y) => (x,y.take(2))}
基本上如何获得这种向量的前两个元素?
谢谢。
您可以通过以下步骤实现上述输出:
原始数据:
indexedData.foreach(println)
(0,[61.0,139.0,75.0])
(1,[63.0,140.0,77.0])
(2,[64.0,129.0,82.0])
(3,[68.0,128.0,56.0])
(4,[71.0,140.0,47.0])
(5,[73.0,141.0,38.0])
(6,[75.0,128.0,59.0])
(7,[64.0,129.0,61.0])
(8,[64.0,129.0,80.0])
(9,[64.0,129.0,99.0])
RRD1 数据:
每行的前两个元素都有索引。
val rdd1 = indexedData.map{case (x,y) => (x, (y.toArray(0), y.toArray(1)))}
rdd1.foreach(println)
(0,(61.0,139.0))
(1,(63.0,140.0))
(2,(64.0,129.0))
(3,(68.0,128.0))
(4,(71.0,140.0))
(5,(73.0,141.0))
(6,(75.0,128.0))
(7,(64.0,129.0))
(8,(64.0,129.0))
(9,(64.0,129.0))
RRD2 数据:
具有索引和行的第三个元素。
val rdd2 = indexedData.map{case (x,y) => (x, y.toArray(2))}
rdd2.foreach(println)
(0,75.0)
(1,77.0)
(2,82.0)
(3,56.0)
(4,47.0)
(5,38.0)
(6,59.0)
(7,61.0)
(8,80.0)
(9,99.0)
您可以利用DenseVector
的unapply
方法获取模式匹配中的底层Array[Double]
,然后调用take
/drop
在 Array 上,用 Vector 重新包装它:
val rdd1 = indexedData.map { case (i, DenseVector(arr)) => (i, Vectors.dense(arr.take(2))) }
val rdd2 = indexedData.map { case (i, DenseVector(arr)) => (i, Vectors.dense(arr.drop(2))) }
如您所见 - 这意味着您创建的原始 DenseVector
并不是那么有用,因此如果您不打算在其他任何地方使用 indexedData
,则可能更好首先将 indexedData
创建为 RDD[(Long, Array[Double])]
:
val points = data.map(s => s.split(',').map(_.toDouble))
val indexedData: RDD[(Long, Array[Double])] = points.zipWithIndex().map(_.swap)
val rdd1 = indexedData.mapValues(arr => Vectors.dense(arr.take(2)))
val rdd2 = indexedData.mapValues(arr => Vectors.dense(arr.drop(2)))
最后提示:您可能想在 indexedData
上调用 .cache()
,然后再扫描两次以创建 rdd1
和 rdd2
- 否则文件将被加载和解析两次。
我在 HDFS 上有一个文件,格式如下:
61,139,75
63,140,77
64,129,82
68,128,56
71,140,47
73,141,38
75,128,59
64,129,61
64,129,80
64,129,99
我从中创建一个 RDD,并用它们的索引压缩元素:
val data = sc.textFile("hdfs://localhost:54310/usrp/sample.txt")
val points = data.map(s => Vectors.dense(s.split(',').map(_.toDouble)))
val indexed = points.zipWithIndex()
val indexedData = indexed.map{case (value,index) => (index,value)}
现在我需要用索引和每行的前两个元素创建 rdd1
。然后需要用每行的索引和第三个元素创建 rdd2
。我是 Scala 的新手,你能帮我看看怎么做吗?
这不起作用,因为 y
不是 Vector
类型,而是 org.apache.spark.mllib.linalg.Vector
val rdd1 = indexedData.map{case (x,y) => (x,y.take(2))}
基本上如何获得这种向量的前两个元素?
谢谢。
您可以通过以下步骤实现上述输出:
原始数据:
indexedData.foreach(println)
(0,[61.0,139.0,75.0])
(1,[63.0,140.0,77.0])
(2,[64.0,129.0,82.0])
(3,[68.0,128.0,56.0])
(4,[71.0,140.0,47.0])
(5,[73.0,141.0,38.0])
(6,[75.0,128.0,59.0])
(7,[64.0,129.0,61.0])
(8,[64.0,129.0,80.0])
(9,[64.0,129.0,99.0])
RRD1 数据:
每行的前两个元素都有索引。
val rdd1 = indexedData.map{case (x,y) => (x, (y.toArray(0), y.toArray(1)))}
rdd1.foreach(println)
(0,(61.0,139.0))
(1,(63.0,140.0))
(2,(64.0,129.0))
(3,(68.0,128.0))
(4,(71.0,140.0))
(5,(73.0,141.0))
(6,(75.0,128.0))
(7,(64.0,129.0))
(8,(64.0,129.0))
(9,(64.0,129.0))
RRD2 数据:
具有索引和行的第三个元素。
val rdd2 = indexedData.map{case (x,y) => (x, y.toArray(2))}
rdd2.foreach(println)
(0,75.0)
(1,77.0)
(2,82.0)
(3,56.0)
(4,47.0)
(5,38.0)
(6,59.0)
(7,61.0)
(8,80.0)
(9,99.0)
您可以利用DenseVector
的unapply
方法获取模式匹配中的底层Array[Double]
,然后调用take
/drop
在 Array 上,用 Vector 重新包装它:
val rdd1 = indexedData.map { case (i, DenseVector(arr)) => (i, Vectors.dense(arr.take(2))) }
val rdd2 = indexedData.map { case (i, DenseVector(arr)) => (i, Vectors.dense(arr.drop(2))) }
如您所见 - 这意味着您创建的原始 DenseVector
并不是那么有用,因此如果您不打算在其他任何地方使用 indexedData
,则可能更好首先将 indexedData
创建为 RDD[(Long, Array[Double])]
:
val points = data.map(s => s.split(',').map(_.toDouble))
val indexedData: RDD[(Long, Array[Double])] = points.zipWithIndex().map(_.swap)
val rdd1 = indexedData.mapValues(arr => Vectors.dense(arr.take(2)))
val rdd2 = indexedData.mapValues(arr => Vectors.dense(arr.drop(2)))
最后提示:您可能想在 indexedData
上调用 .cache()
,然后再扫描两次以创建 rdd1
和 rdd2
- 否则文件将被加载和解析两次。