在Spark中将一个RDD严格划分为多个RDD
Strict partition an RDD into multiple RDDs in Spark
我有一个带有 n
个分区的 rdd,我想将这个 rdd 分成 k
个 rdd,这样
rdd = rdd_1.union(rdd_2).union(rdd_3)...union(rdd_k)
例如,如果 n=10
和 k=2
我想以 2 个 rdds 结束,其中 rdd1 由 5 个分区组成,rdd2 由其他 5 个分区组成。
在 Spark 中执行此操作的最有效方法是什么?
您可以尝试这样的操作:
val rdd: RDD[T] = ???
val k: Integer = ???
val n = rdd.partitions.size
val rdds = (0 until n) // Create Seq of partitions numbers
.grouped(n / k) // group it into fixed sized buckets
.map(idxs => (idxs.head, idxs.last)) // Take the first and the last idx
.map {
case(min, max) => rdd.mapPartitionsWithIndex(
// If partition in [min, max] range keep its iterator
// otherwise return empty-one
(i, iter) => if (i >= min & i <= max) iter else Iterator()
)
}
如果输入 RDD
具有复杂的依赖关系,您应该在应用它之前缓存它。
我有一个带有 n
个分区的 rdd,我想将这个 rdd 分成 k
个 rdd,这样
rdd = rdd_1.union(rdd_2).union(rdd_3)...union(rdd_k)
例如,如果 n=10
和 k=2
我想以 2 个 rdds 结束,其中 rdd1 由 5 个分区组成,rdd2 由其他 5 个分区组成。
在 Spark 中执行此操作的最有效方法是什么?
您可以尝试这样的操作:
val rdd: RDD[T] = ???
val k: Integer = ???
val n = rdd.partitions.size
val rdds = (0 until n) // Create Seq of partitions numbers
.grouped(n / k) // group it into fixed sized buckets
.map(idxs => (idxs.head, idxs.last)) // Take the first and the last idx
.map {
case(min, max) => rdd.mapPartitionsWithIndex(
// If partition in [min, max] range keep its iterator
// otherwise return empty-one
(i, iter) => if (i >= min & i <= max) iter else Iterator()
)
}
如果输入 RDD
具有复杂的依赖关系,您应该在应用它之前缓存它。