在 Spark 中处理大型 gzip 文件

Dealing with a large gzipped file in Spark

我有一个来自 s3 的大型(压缩后约 85 GB)gzip 文件,我正尝试在 AWS EMR 上使用 Spark 进行处理(现在每个实例都有一个 m4.xlarge 主实例和两个 m4.10xlarge 核心实例具有 100 GB EBS 卷)。我知道 gzip 是一种不可拆分的文件格式,并且 I've seen it suggested 人们应该对压缩文件进行重新分区,因为 Spark 最初给出的 RDD 具有一个分区。然而,在做了

scala> val raw = spark.read.format("com.databricks.spark.csv").
     | options(Map("delimiter" -> "\t", "codec" -> "org.apache.hadoop.io.compress.GzipCodec")).
     | load("s3://path/to/file.gz").
     | repartition(sc.defaultParallelism * 3)
raw: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_c0: string, _c1: string ... 48 more fields
scala> raw.count()

并查看 Spark 应用程序 UI,我仍然看到只有一个活跃的执行者(其他 14 个已经死了)有一个任务,而且这个工作永远不会完成(或者至少我没有等了足够长的时间)。

如果文件格式不可拆分,则无法避免在一个核心上读取整个文件。为了并行化工作,您必须知道如何将工作块分配给不同的计算机。在 gzip 案例中,假设您将其分成 128M 块。第n个chunk根据第n-1个chunk的位置信息知道如何解压,这取决于第n-2个chunk,依此类推到第一个。

如果要并行化,则需要使该文件可拆分。一种方法是将其解压缩并在未压缩的情况下对其进行处理,或者您可以将其解压缩,将其拆分为多个文件(一个文件用于您想要的每个并行任务),然后对每个文件进行 gzip。

我遇到过这个问题,这里是解决方案。

解决此问题的最佳方法是在我们的 Spark 批处理 运行 之前解压缩 .gz 文件。然后使用这个解压文件,之后我们就可以使用Spark并行了。

解压 .gz 文件的代码。

import gzip
import shutil
with open('file.txt.gz', 'rb') as f_in, gzip.open('file.txt', 'wb') as f_out:
    shutil.copyfileobj(f_in, f_out)

Spark 无法并行读取单个 gzip 文件。

您最好将它分成 gzip 压缩的块。

但是,Spark 在读取 gzip 文件时真的很慢。您可以这样做来加快速度:

file_names_rdd = sc.parallelize(list_of_files, 100)
lines_rdd = file_names_rdd.flatMap(lambda _: gzip.open(_).readlines())

通过 Python 的速度是读取本机 Spark gzip reader 的两倍。

我使用的解决方案是解压缩编解码器:Niels Basjes 的SplittableGZipCodec。此编解码器会将同一个文件提供给多个 spark 任务。每个任务将 'fast forward' 或寻找 gzip 文件中的特定偏移量,然后从那里开始解压缩。它在同一个 gzip 文件上运行多个任务,显着减少了挂钟时间,增加了 gunzip 成功的机会,同时增加了使用的总核心小时数的小成本。杰出的。我已经在 20-50GB 的文件上对其进行了测试。

这里详细介绍spark方案: https://github.com/nielsbasjes/splittablegzip/blob/master/README-Spark.md

# splittable-gzip.py
from pyspark.sql import SparkSession


if __name__ == '__main__':
    spark = (
        SparkSession.builder
        # If you want to change the split size, you need to use this config
        # instead of mapreduce.input.fileinputformat.split.maxsize.
        # I don't think Spark DataFrames offer an equivalent setting for
        # mapreduce.input.fileinputformat.split.minsize.
        .config('spark.sql.files.maxPartitionBytes', 1000 * (1024 ** 2))
        .getOrCreate()
    )

    print(
        spark.read
        # You can also specify this option against the SparkSession.
        .option('io.compression.codecs', 'nl.basjes.hadoop.io.compress.SplittableGzipCodec')
        .csv(...)
        .count()
    )