Pyspark:重新分区与 partitionBy

Pyspark: repartition vs partitionBy

我现在正在研究这两个概念,希望得到一些澄清。通过命令行工作,我一直在尝试找出差异以及开发人员何时使用 repartition 与 partitionBy。

下面是一些示例代码:

rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 1), ('b', 3), ('c',1), ('ef',5)])
rdd1 = rdd.repartition(4)
rdd2 = rdd.partitionBy(4)

rdd1.glom().collect()
[[('b', 1), ('ef', 5)], [], [], [('a', 1), ('a', 2), ('b', 3), ('c', 1)]]

rdd2.glom().collect()
[[('a', 1), ('a', 2)], [], [('c', 1)], [('b', 1), ('b', 3), ('ef', 5)]]

我查看了两者的实现,我注意到的唯一区别大部分是 partitionBy 可以采用分区函数,或者默认使用 portable_hash。所以在 partitionBy 中,所有相同的键都应该在同一个分区中。在重新分区时,我希望值在分区上分布得更均匀,但事实并非如此。

鉴于此,为什么会有人使用重新分区?我想我唯一能看到它被使用的情况是我没有使用 PairRDD,或者我有大数据倾斜?

有没有我遗漏的东西,或者有人可以从不同的角度为我阐明?

repartition 已经存在于 RDD 中,并且不按键(或除排序以外的任何其他标准)处理分区。现在 PairRDDs 添加了键的概念,随后添加了另一种允许按该键进行分区的方法。

所以是的,如果您的数据是键控的,您绝对应该按该键进行分区,这在许多情况下是首先使用 PairRDD 的要点(用于连接、reduceByKey 等)。

repartition() 用于指定分区数,考虑到核心数和您拥有的数据量。

partitionBy() 用于提高洗牌函数的效率,例如 reduceByKey()join()cogroup() 等。它仅在 RDD多次使用,所以后面通常跟persist().

两者在动作上的区别:

pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1, 5, 6, 7, 7, 5, 5, 6, 4]).map(lambda x: (x, x))

pairs.partitionBy(3).glom().collect()
[[(3, 3), (6, 6), (6, 6)],
 [(1, 1), (4, 4), (4, 4), (1, 1), (7, 7), (7, 7), (4, 4)],
 [(2, 2), (2, 2), (5, 5), (5, 5), (5, 5)]]

pairs.repartition(3).glom().collect()
[[(4, 4), (2, 2), (6, 6), (7, 7), (5, 5), (5, 5)],
 [(1, 1), (4, 4), (6, 6), (4, 4)],
 [(2, 2), (3, 3), (1, 1), (5, 5), (7, 7)]]