对 pyspark 中的列进行重新分区如何影响分区数?

How does repartitioning on a column in pyspark affect the number of partitions?

我有一个包含一百万条记录的数据框。看起来像这样 -

df.show()

+--------------------+--------------------++-------------
|            feature1|            feature2| domain    |
+--------------------+--------------------++-------------
|[2.23668528E8, 1....|[2.23668528E8, 1....| domain1   | 
|[2.23668528E8, 1....|[2.23668528E8, 1....| domain2   |
|[2.23668528E8, 1....|[2.23668528E8, 1....| domain1   |
|[2.23668528E8, 1....|[2.23668528E8, 1....| domain2   |
|[2.23668528E8, 1....|[2.23668528E8, 1....| domain1   |

spark 中的理想分区大小为 128 MB,假设域列有两个唯一值(domain1 和 domain2),考虑到这一点我有两个问题 -

  1. 如果我这样做 df.repartition("domain") 并且如果一个分区不能容纳特定域密钥的所有数据,应用程序会失败还是会根据需要自动创建适合的分区数据?

  2. 假设上面的数据已经根据domain key进行了重新分区,所以会有两个分区(唯一键是domain1和domain2)。现在假设 domain1 和 domain2 重复 1000000 次,我将根据域进行自连接。因此,对于每个域,我将获得大约 10^12 条记录。考虑到我们有两个分区,并且分区数在连接期间没有改变,这两个新分区是否能够处理 1000000 条记录?

答案取决于您的数据大小。当一个分区不能容纳属于一个分区值的所有数据时(例如domain1),将创建更多分区,最多spark.sql.shuffle.partitions个。如果您的数据太大,即一个分区将超过 2GB 的限制(另请参阅 了解相关解释),重新分区将导致 OutOfMemoryError.
正如提供完整答案的旁注:能够将数据放入一个分区并不一定意味着只为分区值生成一个分区。这取决于 - 除其他外 - 执行者的数量以及之前数据的分区方式。 Spark 会尽量避免不必要的混洗,因此可以为一个分区值生成多个分区。

因此,为了防止作业失败,您应该调整 spark.sql.shuffle.partitions 或将所需的分区数与分区列一起传递给 repartition