为什么过滤器不保留分区?
Why filter does not preserve partitioning?
引用自jaceklaskowski.gitbooks.io。
Some operations, e.g. map, flatMap, filter, don’t preserve partitioning.
map, flatMap, filter operations apply a function to every partition.
我不明白为什么过滤器不保留分区。它只是获取满足条件的每个分区的子集,所以我认为可以保留分区。为什么不是这样?
你当然是对的。报价不正确。 filter
确实保留分区(出于您已经描述的原因),确认
是微不足道的
val rdd = sc.range(0, 10).map(x => (x % 3, None)).partitionBy(
new org.apache.spark.HashPartitioner(11)
)
rdd.partitioner
// Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@b)
val filteredRDD = rdd.filter(_._1 == 3)
filteredRDD.partitioner
// Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@b)
rdd.partitioner == filteredRDD.partitioner
// Boolean = true
这与 map
等操作形成对比,后者不保留分区 (Partitioner
):
rdd.map(identity _).partitioner
// Option[org.apache.spark.Partitioner] = None
Datasets
有点微妙,因为过滤器通常是下推的,但总体行为是相似的。
Filter 确实保留分区,至少 filter 的源代码建议这样做 (preservesPartitioning = true
):
/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}
引用自jaceklaskowski.gitbooks.io。
Some operations, e.g. map, flatMap, filter, don’t preserve partitioning. map, flatMap, filter operations apply a function to every partition.
我不明白为什么过滤器不保留分区。它只是获取满足条件的每个分区的子集,所以我认为可以保留分区。为什么不是这样?
你当然是对的。报价不正确。 filter
确实保留分区(出于您已经描述的原因),确认
val rdd = sc.range(0, 10).map(x => (x % 3, None)).partitionBy(
new org.apache.spark.HashPartitioner(11)
)
rdd.partitioner
// Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@b)
val filteredRDD = rdd.filter(_._1 == 3)
filteredRDD.partitioner
// Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@b)
rdd.partitioner == filteredRDD.partitioner
// Boolean = true
这与 map
等操作形成对比,后者不保留分区 (Partitioner
):
rdd.map(identity _).partitioner
// Option[org.apache.spark.Partitioner] = None
Datasets
有点微妙,因为过滤器通常是下推的,但总体行为是相似的。
Filter 确实保留分区,至少 filter 的源代码建议这样做 (preservesPartitioning = true
):
/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}