如何避免使用 Spark 进行循环?

How to avoid for loop with Spark?

我是 spark 的新手,不了解 mapreduce 机制如何与 spark 一起工作。我有一个只有双打的 csv 文件,我想要的是用第一个向量和 rdd 的其余部分进行操作(计算欧几里德距离)。然后迭代其他向量。它以另一种方式存在吗?也许明智地使用笛卡尔积...

val rdd = sc.parallelize(Array((1,Vectors.dense(1,2)),(2,Vectors.dense(3,4),...)))
val array_vects = rdd.collect
val size = rdd.count
val emptyArray = Array((0,Vectors.dense(0))).tail
var rdd_rez = sc.parallelize(emptyArray)

for( ind <- 0 to size -1 ) {
   val vector = array_vects(ind)._2
   val rest = rdd.filter(x => x._1 != ind)
   val rdd_dist = rest.map( x => (x._1 , Vectors.sqdist(x._2,vector)))
   rdd_rez = rdd_rez ++ rdd_dist
}

感谢您的支持。

我不认为你为什么要尝试做那样的事情。您可以简单地按如下方式执行此操作。

val initialArray = Array( ( 1,Vectors.dense(1,2) ), ( 2,Vectors.dense(3,4) ),... )

val firstVector = initialArray( 0 )

val initialRdd = sc.parallelize( initialArray )

val euclideanRdd = initialRdd.map( { case ( i, vec ) => ( i, euclidean( firstVector, vec ) ) } )

我们在这里定义了一个函数 euclidean,它采用两个密集向量和 returns 欧氏距离。

可以使用 rdd.cartesian:

计算(所有向量对之间的)距离
val rdd = sc.parallelize(Array((1,Vectors.dense(1,2)),
                               (2,Vectors.dense(3,4)),...))
val product = rdd.cartesian(rdd)

val result = product.filter{ case ((a, b), (c, d)) => a != c }
                    .map   { case ((a, b), (c, d)) => 
                                   (a, Vectors.sqdist(b, d)) }