使用显式和默认分区器在转换链中进行 Spark 分区
Spark partitioning within transformation chains with explicit and default partitioners
假设我从一开始就对数据进行分区:
即:
val rdd: RDD[Int, String] = ...;
val data = rdd.partitionBy(new RangePartitioner(8, rdd)).persist()
然后假设我做到了:
val groupedData = data.groupByKey() // what is the partitioner here?
通常情况下,groupByKey默认有使用HashPartitioner的分区逻辑——在这种情况下,它会继续使用父分区器,还是会根据默认重新分区数据?
我查看了 Partitioner#defaultPartitioner,情况似乎确实如此(只有在未指定显式分区程序时才使用默认分区逻辑),但我想我会做一个完整性检查。
作为一个附加问题,除了#map、#flatMap 等键更改转换。对于所有保留键的转换,它们是否都保留并传播显式定义的分区程序?如果我从不对数据进行分区,它们会传播先前级别的默认值吗?
即:
rdd.groupByKey() // hash-partitioner by default
.mapValues(_.head)
.sortByKey //range-partitioner by default, but does it use the hash-partitioner from before?
在第一种情况下,分区器将被保留。很容易检查
for {
p1 <- groupedData.partitioner
p2 <- data.partitioner
} yield p1 == p2
// Some(true)
在第二种情况下,分区器是 inherent part of the transformation,因此 HashPartitioner
将被丢弃:
val grouped = rdd.groupByKey
for {
p1 <- grouped.partitioner
p2 <- grouped.sortByKey().partitioner
} yield p1 == p2
// Some(false)
假设我从一开始就对数据进行分区:
即:
val rdd: RDD[Int, String] = ...;
val data = rdd.partitionBy(new RangePartitioner(8, rdd)).persist()
然后假设我做到了:
val groupedData = data.groupByKey() // what is the partitioner here?
通常情况下,groupByKey默认有使用HashPartitioner的分区逻辑——在这种情况下,它会继续使用父分区器,还是会根据默认重新分区数据?
我查看了 Partitioner#defaultPartitioner,情况似乎确实如此(只有在未指定显式分区程序时才使用默认分区逻辑),但我想我会做一个完整性检查。
作为一个附加问题,除了#map、#flatMap 等键更改转换。对于所有保留键的转换,它们是否都保留并传播显式定义的分区程序?如果我从不对数据进行分区,它们会传播先前级别的默认值吗?
即:
rdd.groupByKey() // hash-partitioner by default
.mapValues(_.head)
.sortByKey //range-partitioner by default, but does it use the hash-partitioner from before?
在第一种情况下,分区器将被保留。很容易检查
for {
p1 <- groupedData.partitioner
p2 <- data.partitioner
} yield p1 == p2
// Some(true)
在第二种情况下,分区器是 inherent part of the transformation,因此 HashPartitioner
将被丢弃:
val grouped = rdd.groupByKey
for {
p1 <- grouped.partitioner
p2 <- grouped.sortByKey().partitioner
} yield p1 == p2
// Some(false)