使用 Spark 的 partitionBy 方法对 S3 中的大型倾斜数据集进行分区

Partitioning a large skewed dataset in S3 with Spark's partitionBy method

我正在尝试使用 Spark 将大型分区数据集写入磁盘,但 partitionBy 算法在我尝试过的两种方法中都遇到了困难。

分区严重倾斜 - 有些分区很大,有些很小。

问题 #1:

当我在 partitionBy 之前使用 repartition 时,Spark 将所有分区写入单个文件,即使是巨大的分区也是如此

val df = spark.read.parquet("some_data_lake")
df
  .repartition('some_col).write.partitionBy("some_col")
  .parquet("partitioned_lake")

这需要很长时间才能执行,因为 Spark 不会并行写入大分区。如果其中一个分区有 1TB 的数据,Spark 将尝试将整个 1TB 的数据写入一个文件。

问题 #2:

当我不使用 repartition 时,Spark 写出太多文件。

此代码将写出数量惊人的文件。

df.write.partitionBy("some_col").parquet("partitioned_lake")

我 运行 在一个 8 GB 的小数据子集上,Spark 写出了 85,000 多个文件!

当我在生产数据集上尝试 运行 时,一个包含 1.3 GB 数据的分区被写为 3,100 个文件。

我想要什么

我希望每个分区都写成 1 GB 的文件。因此,具有 7 GB 数据的分区将作为 7 个文件写出,而具有 0.3 GB 数据的分区将作为单个文件写出。

我最好的前进道路是什么?

最简单的解决方案是向 repartition 添加一个或多个列并显式设置分区数。

val numPartitions = ???

df.repartition(numPartitions, $"some_col", $"some_other_col")
 .write.partitionBy("some_col")
 .parquet("partitioned_lake")

其中:

  • numPartitions - 应该是写入分区目录的所需文件数的上限(实际数量可以更低)。
  • $"some_other_col"(和可选的附加列)应该具有高基数并且独立于 $"some_column(这两者之间应该有功能依赖,而不应该是高度相关)。

    如果数据不包含此类列,您可以使用 o.a.s.sql.functions.rand

    import org.apache.spark.sql.functions.rand
    
    df.repartition(numPartitions, $"some_col", rand)
      .write.partitionBy("some_col")
      .parquet("partitioned_lake")
    

I'd like for each partition to get written out as 1 GB files. So a partition that has 7 GB of data will get written out as 7 files and a partition that has 0.3 GB of data will get written out as a single file.

目前接受的答案在大多数情况下可能已经足够好了,但并不能完全满足将 0.3 GB 分区写入单个文件的要求。相反,它将为每个输出分区目录写出 numPartitions 个文件,包括 0.3 GB 分区。

您正在寻找的是一种根据数据分区的大小动态调整输出文件数量的方法。为此,我们将以 10465355 的方法为基础,使用 rand() 来控制 repartition() 的行为,并根据我们希望该分区的文件数量扩展 rand() 的范围.

很难通过输出文件大小来控制分区行为,因此我们将使用每个输出文件所需的近似行数来控制它。

我将在 Python 中提供一个演示,但方法在 Scala 中基本相同。

from pyspark.sql import SparkSession
from pyspark.sql.functions import rand

spark = SparkSession.builder.getOrCreate()
skewed_data = (
    spark.createDataFrame(
        [(1,)] * 100 + [(2,)] * 10 + [(3,), (4,), (5,)],
        schema=['id'],
    )
)
partition_by_columns = ['id']
desired_rows_per_output_file = 10

partition_count = skewed_data.groupBy(partition_by_columns).count()

partition_balanced_data = (
    skewed_data
    .join(partition_count, on=partition_by_columns)
    .withColumn(
        'repartition_seed',
        (
            rand() * partition_count['count'] / desired_rows_per_output_file
        ).cast('int')
    )
    .repartition(*partition_by_columns, 'repartition_seed')
)

这种方法将平衡输出文件的大小,无论分区大小有多大偏差。每个数据分区将获得它需要的文件数,以便每个输出文件大致具有请求的行数。

此方法的先决条件是计算每个分区的大小,您可以在 partition_count 中看到。如果你真的想动态扩展每个分区的输出文件数量,这是不可避免的。

为了证明这是正确的做法,让我们检查一下分区内容:

from pyspark.sql.functions import spark_partition_id

(
    skewed_data
    .groupBy('id')
    .count()
    .orderBy('id')
    .show()
)

(
    partition_balanced_data
    .select(
        *partition_by_columns,
        spark_partition_id().alias('partition_id'),
    )
    .groupBy(*partition_by_columns, 'partition_id')
    .count()
    .orderBy(*partition_by_columns, 'partition_id')
    .show(30)
)

输出如下:

+---+-----+
| id|count|
+---+-----+
|  1|  100|
|  2|   10|
|  3|    1|
|  4|    1|
|  5|    1|
+---+-----+

+---+------------+-----+
| id|partition_id|count|
+---+------------+-----+
|  1|           7|    9|
|  1|          49|    6|
|  1|          53|   14|
|  1|         117|   12|
|  1|         126|   10|
|  1|         136|   11|
|  1|         147|   15|
|  1|         161|    7|
|  1|         177|    7|
|  1|         181|    9|
|  2|          85|   10|
|  3|          76|    1|
|  4|         197|    1|
|  5|          10|    1|
+---+------------+-----+

根据需要,每个输出文件大约有 10 行。 id=1获得10个分区,id=2获得1个分区,id={3,4,5}各获得1个分区。

此解决方案平衡输出文件大小,无论数据倾斜如何,并且不限制并行度

Nick Chammas 方法的替代方法是创建一个按主分区键分区的 row_number() 列,然后将其除以您希望在每个分区中显示的确切记录数。在 SPARK SQL 中表示如下:

SELECT /*+ REPARTITION(id, file_num) */
  id,
  FLOOR(ROW_NUMBER() OVER(PARTITION BY id ORDER BY NULL) / rows_per_file) AS file_num
FROM skewed_data

这样做的额外好处是,它允许您通过在辅助键上使用 ORDER BY 子句,跨文件共置一个分区中的 多数 数据.如果与辅助键关联的行号跨越两个 file_num 值,则不能保证辅助键位于同一位置。也有可能,实际上有点可能,最终得到一个文件,每个分区中只有很少的记录。