在 spark scala 中按顺序聚合键值
Aggregate key-values in order in spark scala
我正在尝试在 spark (Scala) 中实现矩阵 A 的分布式奇异值分解。我设法将乘积 A.t*A 的所有元素计算为 RDD 的转换(A.t A 的转置)并将其作为 RDD[(Int,Int) 形式的 RDD,双)]
Array(((0,0),66.0), ((0,2),90.0), ((1,0),78.0), ((1,2),108.0), ((2,1),108.0), ((0,1),78.0), ((1,1),93.0), ((2,2),126.0), ((2,0),90.0))
其中键 (j,k) 指示矩阵中的行和列 A.t*A 值应该是。
最后,我希望将行作为密集矩阵(但我愿意接受其他建议)。
我尝试在元组的第一部分使用这样的 aggregateByKey(它指示值应该在矩阵的哪一行):
aggregateByKey(new HashSet[Double])(_+_,_++_)
但是我没有在最终矩阵的行中以正确的顺序获取元素。
有什么好的方法吗?我 post 下面的代码可能有用。
谢谢你和亲切的问候。
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
var m = sc.parallelize(Array((1,2,3),(4,5,6),(7,8,9)))
import scala.collection.mutable.ArrayBuffer
//Function that maps an indexed row (a_1,...,a_n) to ((j,k),a_j*a_k)
def f(v: IndexedRow): Array[((Int,Int),Double)]={
var keyvaluepairs = ArrayBuffer[((Int,Int),Double)]()
for(j<-0 to v.vector.size-1){
for(k<-0 to v.vector.size-1){
keyvaluepairs.append(((j,k),v.vector(j)*v.vector(k)))
}
}
keyvaluepairs.toArray
}
//map M to key-value rdd where key =(j,k) and value = a_ij*a_ik.
val keyvalRDD = A.flatMap(row =>f(row))
//Sum up all key-value pairs that have the same key (j,k) (corresponts to getting the element of A.T*A on the j:th row and k:th column).
val keyvalSum = keyvalRDD.reduceByKey((x,y)=>x+y)
val rowkeySum = keyvalSum.map(x=>(x._1._2,x._2)) // The keys are of the form (j,k). just save the index that indicate of which row it should be in the matrix.
import scala.collection.immutable.HashSet
val mergeRows = rowkeySum.aggregateByKey(new HashSet[Double])(_+_,_++_)
import breeze.linalg.{Vector,DenseMatrix}
val Rows = mergeRows.map(x=>x._2.toArray)
//Throw away the keys, turn the rows to Arrays and collect.
val dm = DenseMatrix(Rows:_*)
尝试用坐标矩阵构建矩阵:
def calculate(sc: SparkContext) = {
val matrix =
sc.parallelize(Array(((0,0),66.0), ((0,2),90.0), ((1,0),78.0), ((1,2),108.0), ((2,1),108.0), ((0,1),78.0), ((1,1),93.0), ((2,2),126.0), ((2,0),90.0)))
.map(el => MatrixEntry(el._1._1, el._1._2, el._2))
var i = 0
val mat = new CoordinateMatrix(matrix)
val m = mat.numRows()
val n = mat.numCols()
val result = DenseMatrix.zeros[Double](m.toInt,n.toInt)
mat.toRowMatrix().rows.collect().foreach { vec =>
vec.foreachActive { case (index, value) =>
result(i, index) = value
}
i += 1
}
println("Result: " + result)
}
结果:
66.0 78.0 90.0
78.0 93.0 108.0
90.0 108.0 126.0
我正在尝试在 spark (Scala) 中实现矩阵 A 的分布式奇异值分解。我设法将乘积 A.t*A 的所有元素计算为 RDD 的转换(A.t A 的转置)并将其作为 RDD[(Int,Int) 形式的 RDD,双)]
Array(((0,0),66.0), ((0,2),90.0), ((1,0),78.0), ((1,2),108.0), ((2,1),108.0), ((0,1),78.0), ((1,1),93.0), ((2,2),126.0), ((2,0),90.0))
其中键 (j,k) 指示矩阵中的行和列 A.t*A 值应该是。 最后,我希望将行作为密集矩阵(但我愿意接受其他建议)。
我尝试在元组的第一部分使用这样的 aggregateByKey(它指示值应该在矩阵的哪一行):
aggregateByKey(new HashSet[Double])(_+_,_++_)
但是我没有在最终矩阵的行中以正确的顺序获取元素。
有什么好的方法吗?我 post 下面的代码可能有用。
谢谢你和亲切的问候。
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
var m = sc.parallelize(Array((1,2,3),(4,5,6),(7,8,9)))
import scala.collection.mutable.ArrayBuffer
//Function that maps an indexed row (a_1,...,a_n) to ((j,k),a_j*a_k)
def f(v: IndexedRow): Array[((Int,Int),Double)]={
var keyvaluepairs = ArrayBuffer[((Int,Int),Double)]()
for(j<-0 to v.vector.size-1){
for(k<-0 to v.vector.size-1){
keyvaluepairs.append(((j,k),v.vector(j)*v.vector(k)))
}
}
keyvaluepairs.toArray
}
//map M to key-value rdd where key =(j,k) and value = a_ij*a_ik.
val keyvalRDD = A.flatMap(row =>f(row))
//Sum up all key-value pairs that have the same key (j,k) (corresponts to getting the element of A.T*A on the j:th row and k:th column).
val keyvalSum = keyvalRDD.reduceByKey((x,y)=>x+y)
val rowkeySum = keyvalSum.map(x=>(x._1._2,x._2)) // The keys are of the form (j,k). just save the index that indicate of which row it should be in the matrix.
import scala.collection.immutable.HashSet
val mergeRows = rowkeySum.aggregateByKey(new HashSet[Double])(_+_,_++_)
import breeze.linalg.{Vector,DenseMatrix}
val Rows = mergeRows.map(x=>x._2.toArray)
//Throw away the keys, turn the rows to Arrays and collect.
val dm = DenseMatrix(Rows:_*)
尝试用坐标矩阵构建矩阵:
def calculate(sc: SparkContext) = {
val matrix =
sc.parallelize(Array(((0,0),66.0), ((0,2),90.0), ((1,0),78.0), ((1,2),108.0), ((2,1),108.0), ((0,1),78.0), ((1,1),93.0), ((2,2),126.0), ((2,0),90.0)))
.map(el => MatrixEntry(el._1._1, el._1._2, el._2))
var i = 0
val mat = new CoordinateMatrix(matrix)
val m = mat.numRows()
val n = mat.numCols()
val result = DenseMatrix.zeros[Double](m.toInt,n.toInt)
mat.toRowMatrix().rows.collect().foreach { vec =>
vec.foreachActive { case (index, value) =>
result(i, index) = value
}
i += 1
}
println("Result: " + result)
}
结果:
66.0 78.0 90.0
78.0 93.0 108.0
90.0 108.0 126.0