使用显式和默认分区器在转换链中进行 Spark 分区

Spark partitioning within transformation chains with explicit and default partitioners

假设我从一开始就对数据进行分区:

即:

val rdd: RDD[Int, String] = ...;
val data = rdd.partitionBy(new RangePartitioner(8, rdd)).persist()

然后假设我做到了:

val groupedData = data.groupByKey() // what is the partitioner here?

通常情况下,groupByKey默认有使用HashPartitioner的分区逻辑——在这种情况下,它会继续使用父分区器,还是会根据默认重新分区数据?

我查看了 Partitioner#defaultPartitioner,情况似乎确实如此(只有在未指定显式分区程序时才使用默认分区逻辑),但我想我会做一个完整性检查。

作为一个附加问题,除了#map、#flatMap 等键更改转换。对于所有保留键的转换,它们是否都保留并传播显式定义的分区程序?如果我从不对数据进行分区,它们会传播先前级别的默认值吗?

即:

rdd.groupByKey() // hash-partitioner by default
.mapValues(_.head)
.sortByKey //range-partitioner by default, but does it use the hash-partitioner from before?

在第一种情况下,分区器将被保留。很容易检查

for {
  p1 <- groupedData.partitioner
  p2 <- data.partitioner
} yield p1 == p2

// Some(true)

在第二种情况下,分区器是 inherent part of the transformation,因此 HashPartitioner 将被丢弃:

val grouped = rdd.groupByKey

for {
  p1 <- grouped.partitioner
  p2 <- grouped.sortByKey().partitioner
} yield p1 == p2

// Some(false)