循环内的 RDD 转换
RDD transformation inside a loop
所以我有一个名为 Adat 的 rdd:Array[String],我想在一个循环中转换它并获得一个新的 RDD,我可以在循环外使用它 scope.I 尝试了这个但是结果不是我想要的。
val sharedA = {
for {
i <- 0 to shareA.toInt - 1
j <- 0 to shareA.toInt - 1
} yield {
Adat.map(x => (x(1).toInt, i % shareA.toInt, j % shareA.toInt, x(2)))
}
}
上面的代码将 SharedA rdd 转换为 IndexedSeq[RDD[(Int, Int, Int, String)]],当我尝试打印它时,结果是:
MapPartitionsRDD[12] at map at planet.scala:99
MapPartitionsRDD[13] at map at planet.scala:99 and so on.
如何将 sharedA 转换为 RDD[(Int, Int, Int, String)]
?
如果我这样做,sharedA 具有正确的数据类型,但我不能在范围之外使用它。
for { i <- 0 to shareA.toInt -1
j<-0 to shareA.toInt-1 }
yield {
val sharedA=Adat.map(x => (x(1).toInt,i % shareA.toInt ,j %
shareA.toInt,x(2)))
}
我不太明白你的描述,但 flatMap
应该可以解决问题:
val rdd = sc.parallelize(Seq(Array("", "0", "foo"), Array("", "1", "bar")))
val n = 2
val result = rdd.flatMap(xs => for {
i <- 0 to n
j <- 0 to n
} yield (xs(1).toInt, i, j, xs(2)))
result.take(5)
// Array[(Int, Int, Int, String)] =
// Array((0,0,0,foo), (0,0,1,foo), (0,0,2,foo), (0,1,0,foo), (0,1,1,foo))
不太常见的方法是对结果调用 SparkContext.union
:
val resultViaUnion = sc.union(for {
i <- 0 to n
j <- 0 to n
} yield rdd.map(xs => (xs(1).toInt, i, j, xs(2))))
resultViaUnion.take(5)
// Array[(Int, Int, Int, String)] =
// Array((0,0,0,foo), (1,0,0,bar), (0,0,1,foo), (1,0,1,bar), (0,0,2,foo))
所以我有一个名为 Adat 的 rdd:Array[String],我想在一个循环中转换它并获得一个新的 RDD,我可以在循环外使用它 scope.I 尝试了这个但是结果不是我想要的。
val sharedA = {
for {
i <- 0 to shareA.toInt - 1
j <- 0 to shareA.toInt - 1
} yield {
Adat.map(x => (x(1).toInt, i % shareA.toInt, j % shareA.toInt, x(2)))
}
}
上面的代码将 SharedA rdd 转换为 IndexedSeq[RDD[(Int, Int, Int, String)]],当我尝试打印它时,结果是:
MapPartitionsRDD[12] at map at planet.scala:99
MapPartitionsRDD[13] at map at planet.scala:99 and so on.
如何将 sharedA 转换为 RDD[(Int, Int, Int, String)]
?
如果我这样做,sharedA 具有正确的数据类型,但我不能在范围之外使用它。
for { i <- 0 to shareA.toInt -1
j<-0 to shareA.toInt-1 }
yield {
val sharedA=Adat.map(x => (x(1).toInt,i % shareA.toInt ,j %
shareA.toInt,x(2)))
}
我不太明白你的描述,但 flatMap
应该可以解决问题:
val rdd = sc.parallelize(Seq(Array("", "0", "foo"), Array("", "1", "bar")))
val n = 2
val result = rdd.flatMap(xs => for {
i <- 0 to n
j <- 0 to n
} yield (xs(1).toInt, i, j, xs(2)))
result.take(5)
// Array[(Int, Int, Int, String)] =
// Array((0,0,0,foo), (0,0,1,foo), (0,0,2,foo), (0,1,0,foo), (0,1,1,foo))
不太常见的方法是对结果调用 SparkContext.union
:
val resultViaUnion = sc.union(for {
i <- 0 to n
j <- 0 to n
} yield rdd.map(xs => (xs(1).toInt, i, j, xs(2))))
resultViaUnion.take(5)
// Array[(Int, Int, Int, String)] =
// Array((0,0,0,foo), (1,0,0,bar), (0,0,1,foo), (1,0,1,bar), (0,0,2,foo))