为什么过滤器不保留分区?

Why filter does not preserve partitioning?

引用自jaceklaskowski.gitbooks.io

Some operations, e.g. map, flatMap, filter, don’t preserve partitioning. map, flatMap, filter operations apply a function to every partition.

我不明白为什么过滤器不保留分区。它只是获取满足条件的每个分区的子集,所以我认为可以保留分区。为什么不是这样?

你当然是对的。报价不正确。 filter 确实保留分区(出于您已经描述的原因),确认

是微不足道的
val rdd = sc.range(0, 10).map(x => (x % 3, None)).partitionBy(
  new org.apache.spark.HashPartitioner(11)
)

rdd.partitioner
// Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@b)

val filteredRDD = rdd.filter(_._1 == 3)
filteredRDD.partitioner
// Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@b)

rdd.partitioner == filteredRDD.partitioner
// Boolean = true

这与 map 等操作形成对比,后者不保留分区 (Partitioner):

rdd.map(identity _).partitioner
// Option[org.apache.spark.Partitioner] = None

Datasets 有点微妙,因为过滤器通常是下推的,但总体行为是相似的。

Filter 确实保留分区,至少 filter 的源代码建议这样做 (preservesPartitioning = true):

  /**
   * Return a new RDD containing only the elements that satisfy a predicate.
   */
  def filter(f: T => Boolean): RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
      this,
      (context, pid, iter) => iter.filter(cleanF),
      preservesPartitioning = true)
  }