Spark RDD 操作类似于 top 返回一个较小的 RDD
Spark RDD operation like top returning a smaller RDD
我正在寻找类似 top
或 takeOrdered
的 Spark RDD 操作,但是 returns 另一个 RDD,而不是数组,也就是说,不会收集完整的结果内存。
它可以是一系列操作,但理想情况下,没有任何步骤试图将完整结果收集到单个节点的内存中。
看看
import org.apache.spark.mllib.rdd.MLPairRDDFunctions._
val rdd: RDD[(String, Int)] // the first string is the key, the rest is the value
val topByKey:RDD[(String, Array[Int])] = rdd.topByKey(n)
或使用 aggregate
和 BoundedPriorityQueue
。
假设您想要 RDD 的前 50%。
def top50(rdd: RDD[(Double, String)]) = {
val sorted = rdd.sortByKey(ascending = false)
val partitions = sorted.partitions.size
// Throw away the contents of the lower partitions.
sorted.mapPartitionsWithIndex { (pid, it) =>
if (pid <= partitions / 2) it else Nil
}
}
这是一个近似值 — 您可能会多于或少于 50%。您可以做得更好,但需要对 RDD 进行额外评估。对于我想到的用例,这不值得。
我正在寻找类似 top
或 takeOrdered
的 Spark RDD 操作,但是 returns 另一个 RDD,而不是数组,也就是说,不会收集完整的结果内存。
它可以是一系列操作,但理想情况下,没有任何步骤试图将完整结果收集到单个节点的内存中。
看看
import org.apache.spark.mllib.rdd.MLPairRDDFunctions._
val rdd: RDD[(String, Int)] // the first string is the key, the rest is the value
val topByKey:RDD[(String, Array[Int])] = rdd.topByKey(n)
或使用 aggregate
和 BoundedPriorityQueue
。
假设您想要 RDD 的前 50%。
def top50(rdd: RDD[(Double, String)]) = {
val sorted = rdd.sortByKey(ascending = false)
val partitions = sorted.partitions.size
// Throw away the contents of the lower partitions.
sorted.mapPartitionsWithIndex { (pid, it) =>
if (pid <= partitions / 2) it else Nil
}
}
这是一个近似值 — 您可能会多于或少于 50%。您可以做得更好,但需要对 RDD 进行额外评估。对于我想到的用例,这不值得。