数据块可扩展集群的 Spark 重新分区逻辑
Spark re partition logic for databricks scalable cluster
Databricks spark 集群可以 auto-scale 根据负载。
我正在 spark 中读取 gzip 文件并在 rdd 上进行重新分区以获得并行性,至于 gzip 文件,它将在 signle 核心上读取并生成 rdd with one partition。
根据 this post 理想的分区数是我可以在重新分区期间设置的集群中的核心数,但在自动缩放集群的情况下,该数字将根据集群的状态以及如何变化而变化里面有很多执行者。
那么,自动扩展的 spark 集群的分区逻辑应该是什么?
编辑 1:
文件夹在不断增大,gzip 文件会定期出现在其中,gzip 文件的大小约为 10GB,未压缩的大小约为 150GB。我知道可以并行读取多个文件。但是对于单个超大文件数据块可能会尝试自动扩展集群,但是即使在扩展集群中的核心后增加了,我的数据帧的分区数量也会减少(基于之前的集群状态,它可能具有较少的核心) .
即使我的集群会自动扩展(scale out),处理也会被限制在我做的分区数量
num_partitions = <cluster cores before scaling>
df.repartition(num_partitions)
对于可拆分的 file/data,分区将主要根据核心、操作是窄还是宽、文件大小等自动创建。分区也可以使用 coalesce
和 [=11 以编程方式控制=].但是对于一个 gzip/un-splittable 文件,一个文件只有 1 个任务,并且它可以与可用的内核一样多(就像你说的那样)。
对于动态集群,您可以选择将作业指向包含大量 gzip 文件的 folder/bucket。假设您有 1000 个文件要处理,并且您有 10 个内核,那么 10 个将并行处理。当您的集群动态增加到 20 时,20 将 运行 并行。这是自动发生的,您不需要为此编写代码。唯一的问题是您不能缩放比可用内核更少的文件。这是不可拆分文件的一个已知缺陷。
另一种选择是根据可用文件的数量和大小来定义作业的簇大小。您可以找到基于历史 运行 时间的经验公式。假设您有 5 个大文件和 10 个小文件(大文件的一半大小),那么您可以分配 20 个核心 (10 + 2*5) 以有效地使用集群资源。
标准的 gzip 文件是不可分割的,因此 Spark 将只用一个核心、一个任务来处理 gzip 文件,无论您的设置是什么 [从 Spark 2.4.5/3.0 开始]。希望世界在创建大文件时转向 bzip2 或其他可拆分压缩技术。
如果您直接将数据写入 Parquet,您最终会得到一个可拆分的 parquet 文件。这将由单个核心写出。
如果卡在默认的gzip codec,最好读取后重新分区,写出多个parquet文件。
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
schema = StructType([
StructField("a",IntegerType(),True),
StructField("b",DoubleType(),True),
StructField("c",DoubleType(),True)])
input_path = "s3a://mybucket/2G_large_csv_gzipped/onebillionrows.csv.gz"
spark.conf.set('spark.sql.files.maxPartitionBytes', 1000 * (1024 ** 2))
df_two = spark.read.format("csv").schema(schema).load(input_path)
df_two.repartition(32).write.format("parquet").mode("overwrite").save("dbfs:/tmp/spark_gunzip_default_remove_me")
我最近发现了一个可拆分的 gzip 编解码器,初步测试非常有前途。这个编解码器实际上多次读取文件,每个任务提前扫描一些字节数(w/o 解压)然后开始解压。
当需要将数据帧作为镶木地板文件写出时,这样做的好处就会得到回报。您最终会得到多个文件,全部并行写入,以获得更大的吞吐量和更短的挂钟时间(您的 CPU 小时会更长)。
参考:https://github.com/nielsbasjes/splittablegzip/blob/master/README-Spark.md
我的测试用例:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
schema = StructType([
StructField("a",IntegerType(),True),
StructField("b",DoubleType(),True),
StructField("c",DoubleType(),True)])
input_path = "s3a://mybucket/2G_large_csv_gzipped/onebillionrows.csv.gz"
spark.conf.set('spark.sql.files.maxPartitionBytes', 1000 * (1024 ** 2))
df_gz_codec = (spark.read
.option('io.compression.codecs', 'nl.basjes.hadoop.io.compress.SplittableGzipCodec')
.schema(schema)
.csv(input_path)
)
df_gz_codec.write.format("parquet").save("dbfs:/tmp/gunzip_to_parquet_remove_me")
Databricks spark 集群可以 auto-scale 根据负载。
我正在 spark 中读取 gzip 文件并在 rdd 上进行重新分区以获得并行性,至于 gzip 文件,它将在 signle 核心上读取并生成 rdd with one partition。
根据 this post 理想的分区数是我可以在重新分区期间设置的集群中的核心数,但在自动缩放集群的情况下,该数字将根据集群的状态以及如何变化而变化里面有很多执行者。
那么,自动扩展的 spark 集群的分区逻辑应该是什么?
编辑 1:
文件夹在不断增大,gzip 文件会定期出现在其中,gzip 文件的大小约为 10GB,未压缩的大小约为 150GB。我知道可以并行读取多个文件。但是对于单个超大文件数据块可能会尝试自动扩展集群,但是即使在扩展集群中的核心后增加了,我的数据帧的分区数量也会减少(基于之前的集群状态,它可能具有较少的核心) .
即使我的集群会自动扩展(scale out),处理也会被限制在我做的分区数量
num_partitions = <cluster cores before scaling>
df.repartition(num_partitions)
对于可拆分的 file/data,分区将主要根据核心、操作是窄还是宽、文件大小等自动创建。分区也可以使用 coalesce
和 [=11 以编程方式控制=].但是对于一个 gzip/un-splittable 文件,一个文件只有 1 个任务,并且它可以与可用的内核一样多(就像你说的那样)。
对于动态集群,您可以选择将作业指向包含大量 gzip 文件的 folder/bucket。假设您有 1000 个文件要处理,并且您有 10 个内核,那么 10 个将并行处理。当您的集群动态增加到 20 时,20 将 运行 并行。这是自动发生的,您不需要为此编写代码。唯一的问题是您不能缩放比可用内核更少的文件。这是不可拆分文件的一个已知缺陷。
另一种选择是根据可用文件的数量和大小来定义作业的簇大小。您可以找到基于历史 运行 时间的经验公式。假设您有 5 个大文件和 10 个小文件(大文件的一半大小),那么您可以分配 20 个核心 (10 + 2*5) 以有效地使用集群资源。
标准的 gzip 文件是不可分割的,因此 Spark 将只用一个核心、一个任务来处理 gzip 文件,无论您的设置是什么 [从 Spark 2.4.5/3.0 开始]。希望世界在创建大文件时转向 bzip2 或其他可拆分压缩技术。
如果您直接将数据写入 Parquet,您最终会得到一个可拆分的 parquet 文件。这将由单个核心写出。 如果卡在默认的gzip codec,最好读取后重新分区,写出多个parquet文件。
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
schema = StructType([
StructField("a",IntegerType(),True),
StructField("b",DoubleType(),True),
StructField("c",DoubleType(),True)])
input_path = "s3a://mybucket/2G_large_csv_gzipped/onebillionrows.csv.gz"
spark.conf.set('spark.sql.files.maxPartitionBytes', 1000 * (1024 ** 2))
df_two = spark.read.format("csv").schema(schema).load(input_path)
df_two.repartition(32).write.format("parquet").mode("overwrite").save("dbfs:/tmp/spark_gunzip_default_remove_me")
我最近发现了一个可拆分的 gzip 编解码器,初步测试非常有前途。这个编解码器实际上多次读取文件,每个任务提前扫描一些字节数(w/o 解压)然后开始解压。 当需要将数据帧作为镶木地板文件写出时,这样做的好处就会得到回报。您最终会得到多个文件,全部并行写入,以获得更大的吞吐量和更短的挂钟时间(您的 CPU 小时会更长)。
参考:https://github.com/nielsbasjes/splittablegzip/blob/master/README-Spark.md
我的测试用例:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
schema = StructType([
StructField("a",IntegerType(),True),
StructField("b",DoubleType(),True),
StructField("c",DoubleType(),True)])
input_path = "s3a://mybucket/2G_large_csv_gzipped/onebillionrows.csv.gz"
spark.conf.set('spark.sql.files.maxPartitionBytes', 1000 * (1024 ** 2))
df_gz_codec = (spark.read
.option('io.compression.codecs', 'nl.basjes.hadoop.io.compress.SplittableGzipCodec')
.schema(schema)
.csv(input_path)
)
df_gz_codec.write.format("parquet").save("dbfs:/tmp/gunzip_to_parquet_remove_me")