"Un-flatten" spark 中的一个 RDD
"Un-flatten" an RDD in spark
我有一个 RDD[LabeledPoint]
尺寸 N。
我想将其转换为 RDD[Array[LabeledPoint]]
,所有数组的大小大致相同(如果需要,除了一个较小的数组)。
我发现 here 一种方法(对于 RDD[Double]
)遍历 RRD 的分区:
val batchedRDD = rdd.mapPartitions { iter: Iterator[Int] =>
new Iterator[Array[Int]] {
def hasNext: Boolean = iter.hasNext
def next(): Array[Int] = {
iter.take(batchedDegree).toArray
}
}
}
然而,在实践中,由于此方法是分区的,它会创建许多大小(远)小于所需大小的数组。
我考虑过使用 coalesce
来减少分区数量,从而减少较小数组的数量。但这可能会降低我工作后期的速度。
您还有其他想法可以更好地改造 RDD
吗?
您可以使用 rdd.glom()
.
来自 Scala 文档:
/**
* Return an RDD created by coalescing all elements within each
partition into an array.
*/
def glom(): RDD[Array[T]] = withScope
{
new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
}
我有一个 RDD[LabeledPoint]
尺寸 N。
我想将其转换为 RDD[Array[LabeledPoint]]
,所有数组的大小大致相同(如果需要,除了一个较小的数组)。
我发现 here 一种方法(对于 RDD[Double]
)遍历 RRD 的分区:
val batchedRDD = rdd.mapPartitions { iter: Iterator[Int] =>
new Iterator[Array[Int]] {
def hasNext: Boolean = iter.hasNext
def next(): Array[Int] = {
iter.take(batchedDegree).toArray
}
}
}
然而,在实践中,由于此方法是分区的,它会创建许多大小(远)小于所需大小的数组。
我考虑过使用 coalesce
来减少分区数量,从而减少较小数组的数量。但这可能会降低我工作后期的速度。
您还有其他想法可以更好地改造 RDD
吗?
您可以使用 rdd.glom()
.
来自 Scala 文档:
/**
* Return an RDD created by coalescing all elements within each partition into an array.
*/def glom(): RDD[Array[T]] = withScope { new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray)) }