具有 gzip 格式的大文本文件的 Spark 作业

Spark job with large text file in gzip format

我正在 运行 处理一个 Spark 作业,它处理输入文件的时间太长了。输入文件为 6.8 GB Gzip 格式,包含 1.1 亿行文本。我知道它是 Gzip 格式的,所以它是不可拆分的,只有一个执行程序将用于读取该文件。

作为我调试过程的一部分,我决定看看将 gzip 文件转换为 parquet 需要多长时间。我的想法是,一旦我转换为 parquet 文件,然后如果我 运行 我在该文件上的原始 Spark 作业,在这种情况下它将使用多个执行程序并且输入文件将被并行处理。

但即使是小工作也比我预期的要花更长的时间。这是我的代码:

val input = sqlContext.read.text("input.gz")
input.write.parquet("s3n://temp-output/")

当我在我的笔记本电脑(16 GB RAM)中提取该文件时,它只用了不到 2 分钟。当我 运行 它在 Spark 集群上时,我的期望是它会花费相同甚至更少的时间,因为我使用的执行程序内存是 58 GB。花了大约 20 分钟。

我在这里错过了什么?如果这听起来很业余,我很抱歉,但我是 Spark 的新手。

在 gzip 文件上 运行 Spark 作业的最佳方式是什么?假设我没有选择以其他文件格式(bzip2、snappy、lzo)创建该文件。

在执行输入-处理-输出类型的 Spark 作业时,需要考虑三个不同的问题:

  1. 输入并行度
  2. 处理并行度
  3. 输出并行度

在您的情况下,输入并行度为 1,因为在您的问题中您声称无法更改输入格式或粒度。

您基本上也没有进行任何处理,因此您无法在那里获得任何收益。

但是,您可以控制输出并行度,这会给您带来两个好处:

  • 多个CPU会写,从而减少写操作的总时间。

  • 您的输出将拆分为多个文件,以便您在以后的处理中利用输入并行性。

要增加并行度,您必须增加分区的数量,这可以通过 repartition() 来实现,例如

val numPartitions = ...
input.repartition(numPartitions).write.parquet("s3n://temp-output/")

选择最佳分区数时,需要考虑许多不同的因素。

  • 数据大小
  • 分区偏斜
  • 集群 RAM 大小
  • 集群中的核心数
  • 您将进行的后续处理类型
  • 您将用于后续处理的集群(RAM 和内核)的大小
  • 您正在写入的系统

在不了解您的目标和限制的情况下,很难提出可靠的建议,但这里有一些通用的指导原则:

  • 由于您的分区不会倾斜(上述 repartition 的使用将使用纠正倾斜的散列分区程序),如果您设置数字,您将获得最快的吞吐量分区的数量等于执行器核心的数量,假设您使用的节点具有足够的 I/O.

  • 当您处理数据时,您确实希望整个分区能够 "fit" 在分配给单个执行程序核心的 RAM 中。这里的"fit"是什么意思取决于你的处理。如果您正在进行简单的 map 转换,则可能会流式传输数据。如果您正在做一些涉及订购的事情,那么 RAM 需要大幅增长。如果您使用的是 Spark 1.6+,您将受益于更灵活的内存管理。如果您使用的是早期版本,则必须更加小心。当 Spark 必须启动 "buffering" 到磁盘时,作业执行会停止。磁盘上的大小和内存中的大小可能非常非常不同。后者根据您处理数据的方式以及 Spark 可以从谓词下推中获得多少好处(Parquet 支持)而有所不同。使用 Spark UI 查看各个作业阶段需要多少 RAM。

顺便说一句,除非您的数据具有非常特定的结构,否则不要对分区号进行硬编码,因为这样您的代码将 运行 在不同大小的集群上处于次优状态。相反,使用以下技巧来确定集群中执行程序的数量。然后,您可以根据您使用的机器乘以每个执行程序的内核数。

// -1 is for the driver node
val numExecutors = sparkContext.getExecutorStorageStatus.length - 1

作为参考,在我们的团队中,我们使用相当复杂的数据结构,这意味着 RAM 大小 >> 磁盘大小,我们的目标是将 S3 对象保持在 50-250Mb 范围内以在节点上进行处理其中每个执行器核心都有 10-20Gb RAM。

希望对您有所帮助。