如何在 Spark 中找到每个分区的总和
How to find Sum at Each partition in Spark
我创建了 class 并使用 class 创建了 RDD。我想计算每个分区的 LoudnessRate(class 的成员)的总和。此总和稍后将用于计算每个分区的 Mean LoudnessRate。
我尝试了以下代码,但它不计算 Sum 和 returns 0.0。
我的密码是
object sparkBAT {
def main(args: Array[String]): Unit = {
val numPartitions = 3
val N = 50
val d = 5
val MinVal = -10
val MaxVal = 10
val conf = new SparkConf().setMaster(locally("local")).setAppName("spark Sum")
val sc = new SparkContext(conf)
val ba = List.fill(N)(new BAT(d, MinVal, MaxVal))
val rdd = sc.parallelize(ba, numPartitions)
var arrSum =Array.fill(numPartitions)(0.0) // Declare Array that will hold sum for each Partition
rdd.mapPartitionsWithIndex((k,iterator) => iterator.map(x => arrSum(k) += x.LoudnessRate)).collect()
arrSum foreach println
}
}
class BAT (dim:Int, min:Double, max:Double) extends Serializable {
val random = new Random()
var position : List[Double] = List.fill(dim) (random.nextDouble() * (max-min)+min )
var velocity :List[Double] = List.fill(dim)( math.random)
var PulseRate : Double = 0.1
var LoudnessRate :Double = 0.95
var frequency :Double = math.random
var fitness :Double = math.random
var BestPosition :List[Double] = List.fill(dim)(math.random)
var BestFitness :Double = math.random
}
问题是您使用的是在驱动程序中声明并在执行程序中更新的 arrSum(常规集合)。每当您这样做时,您都需要使用累加器。
This 应该有帮助
正在按要求将我的评论更改为答案。原评论
You are modifying arrSum in executor JVMs and printing its values in the dirver JVM. You can map the iterators to singleton iterators and use collect to move the values to the driver. Also, don't use iterator.map for side-effects, iterator.foreach is meant for that.
这是一个示例片段,说明如何操作。首先创建一个包含两个分区的 RDD,0 -> 1,2,3
和 1 -> 4,5
。当然,在实际代码中你不需要这个,但是由于 sc.parallelize
行为会根据环境而变化,这将始终创建统一的 RDDs 来重现:
object DemoPartitioner extends Partitioner {
override def numPartitions: Int = 2
override def getPartition(key: Any): Int = key match {
case num: Int => num
}
}
val rdd = sc
.parallelize(Seq((0, 1), (0, 2), (0, 3), (1, 4), (1, 5)))
.partitionBy(DemoPartitioner)
.map(_._2)
然后是真正的技巧:
val sumsByPartition = rdd.mapPartitionsWithIndex {
case (partitionNum, it) => Iterator.single(partitionNum -> it.sum)
}.collect().toMap
println(sumsByPartition)
输出:
Map(0 -> 6, 1 -> 9)
我创建了 class 并使用 class 创建了 RDD。我想计算每个分区的 LoudnessRate(class 的成员)的总和。此总和稍后将用于计算每个分区的 Mean LoudnessRate。 我尝试了以下代码,但它不计算 Sum 和 returns 0.0。 我的密码是
object sparkBAT {
def main(args: Array[String]): Unit = {
val numPartitions = 3
val N = 50
val d = 5
val MinVal = -10
val MaxVal = 10
val conf = new SparkConf().setMaster(locally("local")).setAppName("spark Sum")
val sc = new SparkContext(conf)
val ba = List.fill(N)(new BAT(d, MinVal, MaxVal))
val rdd = sc.parallelize(ba, numPartitions)
var arrSum =Array.fill(numPartitions)(0.0) // Declare Array that will hold sum for each Partition
rdd.mapPartitionsWithIndex((k,iterator) => iterator.map(x => arrSum(k) += x.LoudnessRate)).collect()
arrSum foreach println
}
}
class BAT (dim:Int, min:Double, max:Double) extends Serializable {
val random = new Random()
var position : List[Double] = List.fill(dim) (random.nextDouble() * (max-min)+min )
var velocity :List[Double] = List.fill(dim)( math.random)
var PulseRate : Double = 0.1
var LoudnessRate :Double = 0.95
var frequency :Double = math.random
var fitness :Double = math.random
var BestPosition :List[Double] = List.fill(dim)(math.random)
var BestFitness :Double = math.random
}
问题是您使用的是在驱动程序中声明并在执行程序中更新的 arrSum(常规集合)。每当您这样做时,您都需要使用累加器。
This 应该有帮助
正在按要求将我的评论更改为答案。原评论
You are modifying arrSum in executor JVMs and printing its values in the dirver JVM. You can map the iterators to singleton iterators and use collect to move the values to the driver. Also, don't use iterator.map for side-effects, iterator.foreach is meant for that.
这是一个示例片段,说明如何操作。首先创建一个包含两个分区的 RDD,0 -> 1,2,3
和 1 -> 4,5
。当然,在实际代码中你不需要这个,但是由于 sc.parallelize
行为会根据环境而变化,这将始终创建统一的 RDDs 来重现:
object DemoPartitioner extends Partitioner {
override def numPartitions: Int = 2
override def getPartition(key: Any): Int = key match {
case num: Int => num
}
}
val rdd = sc
.parallelize(Seq((0, 1), (0, 2), (0, 3), (1, 4), (1, 5)))
.partitionBy(DemoPartitioner)
.map(_._2)
然后是真正的技巧:
val sumsByPartition = rdd.mapPartitionsWithIndex {
case (partitionNum, it) => Iterator.single(partitionNum -> it.sum)
}.collect().toMap
println(sumsByPartition)
输出:
Map(0 -> 6, 1 -> 9)