Spark压缩网络和瓶颈
Spark compression network and bottleneck
我正在研究基于描述文件系统的 csv 的 ETL,将其转换为 parquet,这样我就可以轻松地处理它们以提取信息。
我正在使用 Mr. Powers framework Daria 这样做。我已经安静了不同的输入和大量的转换和框架帮助组织代码。
我有一个独立集群 v2.3.2,由 4 个节点组成,每个节点有 8 个内核和 32GB 内存。
存储由安装在所有节点上的 CephFS 卷处理。
首先对我的算法做一个简短的描述(非常简单):
Use SparkContext to load the csv.bz2 file,
Chain a lot of withColumn() statement,
Drop all unnecessary columns,
Write parquet file to CephFS
这种处理可能需要几个小时,具体取决于 CSV 的行数,我想确定 bz2 或网络是否有问题,所以我 运行 进行了以下测试(多次得到一致的结果):
我尝试了以下场景,每个任务有 20 个内核和 2 个内核:
- 从 CephFS 以每个节点 1Gb/s 的连接读取
csv.bz2
:~5 分钟。
- 从 TMPFS 读取
csv.bz2
(设置看起来像共享存储 space):~5 分钟。
- 根据之前的 2 次测试,我得出结论,解压缩文件是瓶颈的一部分,因此我决定解压缩文件并将其也存储在 TMPFS 中,结果:~5.9 分钟。
测试文件有 25'833'369 行,压缩后 370MB,未压缩 3700MB。
这些结果已被多次重现。
我的问题是在这种情况下我的瓶颈是什么?我虽然 RAM 中的未压缩文件会是最快的。
有没有可能是我的程序在读取 CSV 时表现不佳?在集群的执行日志中,我有 5 到 10 秒的最大 GC 时间,时间线主要显示 CPU 时间(没有改组,也没有随机化过载)。
我还注意到在执行过程中从未使用过内存存储。
我从几个小时的研究中了解到,bz2 是唯一可用作 spark 并行化原因输入的真正压缩算法。
编辑
在这种情况下,瓶颈来自我的虚拟机,它们都在同一个管理程序上。
在确定性能瓶颈时,您可以检查的地方是 CPU、磁盘 I/O、网络带宽和内存。在您的情况下,网络和内存似乎可以排除。
我建议你使用一些系统监控软件,如dstat,在每个节点上监控上述四个因素的指标。
如果 所有节点 的 CPU 使用率接近 100%,那么这可能只是您的算法问题,您可能想检查是否可以优化它.
如果 CPU 使用率低,那么我会检查您的文件系统的 I/O 容量。您对未压缩数据运行速度较慢的测试可能是由于更高的 I/O 成本。实际上,大多数人压缩数据是为了性能:解压成本大多低于I/O成本。
如果您发现节点间的资源使用不平衡,则您的数据可能存在偏差。尽管每个节点都可以访问共享存储,但数据仍然具有局部性。您可能会看到某些节点比其对等节点工作时间更长或网络移动过多(如果数据分布良好,您可能会看到很少的网络移动)。
将输入存储在一个单一的大文件中通常不是一个好主意,尤其是在它被存档的情况下。计算的第一步始终是加载它并解压缩,这是顺序的。然后您可以利用反序列化数据的并行性,但要以网络流量为代价(这并不总是有回报)。
我建议将输入文件分成多个块,以便 Spark 可以并行读取它们。
在Spark中处理大型压缩文件时,我们需要考虑压缩文件的因素,如压缩文件的字节大小、压缩率、解压时间、压缩的内存需求和解压的内存需求。
文件大小(以字节为单位)和压缩比 -
Let's take a look on bz2 compressed file processing in spark, its ideal for processing in spark as file is split-able automatically, other advantage in terms of compression ratio is high compared to other compressions except gzip and lzma.
But problem with gzip when processing with spark main concern is its not split-able which will degrade processing time in spark. So compression wise bz2 is ideal for spark processing as it has very high compression ratio.
Where if we consider lzma compression its also better option compare to bz2 and split-able also. But only problem is with compression time is higher.
解压时间-
when it comes to decompression time lzma, it has very fast decompression speed compared to bz2. While others are slower compared to these two.
压缩和解压的内存要求-
where if we compare the memory requirement for compression and decompression for bz4, its less compared to other compression format except gzip.
您可以在下面link,
找到不同压缩格式的基准测试信息
从上面的分析来看,哪一个是最好的压缩取决于用户的需要如果he/she准备在处理时间上妥协并寻找高压缩那么它he/she可以检查更高的压缩格式。
虽然如果用户可以在 storage/memory space 上妥协并希望通过 put he/she 进行更多处理,则可以选择压缩和解压缩速度更快的格式。
But on my opinion about storage/memory space is not an issue when you work on distributed system like spark. Processing through put is more important factor user are more focused and less worried about storage/memory space. Also whenever in spark try to process compressed file it will try to un-compress it first and then process it and if decompression is take more time that would be performance.
我正在研究基于描述文件系统的 csv 的 ETL,将其转换为 parquet,这样我就可以轻松地处理它们以提取信息。
我正在使用 Mr. Powers framework Daria 这样做。我已经安静了不同的输入和大量的转换和框架帮助组织代码。
我有一个独立集群 v2.3.2,由 4 个节点组成,每个节点有 8 个内核和 32GB 内存。
存储由安装在所有节点上的 CephFS 卷处理。
首先对我的算法做一个简短的描述(非常简单):
Use SparkContext to load the csv.bz2 file,
Chain a lot of withColumn() statement,
Drop all unnecessary columns,
Write parquet file to CephFS
这种处理可能需要几个小时,具体取决于 CSV 的行数,我想确定 bz2 或网络是否有问题,所以我 运行 进行了以下测试(多次得到一致的结果):
我尝试了以下场景,每个任务有 20 个内核和 2 个内核:
- 从 CephFS 以每个节点 1Gb/s 的连接读取
csv.bz2
:~5 分钟。 - 从 TMPFS 读取
csv.bz2
(设置看起来像共享存储 space):~5 分钟。 - 根据之前的 2 次测试,我得出结论,解压缩文件是瓶颈的一部分,因此我决定解压缩文件并将其也存储在 TMPFS 中,结果:~5.9 分钟。
测试文件有 25'833'369 行,压缩后 370MB,未压缩 3700MB。 这些结果已被多次重现。
我的问题是在这种情况下我的瓶颈是什么?我虽然 RAM 中的未压缩文件会是最快的。
有没有可能是我的程序在读取 CSV 时表现不佳?在集群的执行日志中,我有 5 到 10 秒的最大 GC 时间,时间线主要显示 CPU 时间(没有改组,也没有随机化过载)。
我还注意到在执行过程中从未使用过内存存储。
我从几个小时的研究中了解到,bz2 是唯一可用作 spark 并行化原因输入的真正压缩算法。
编辑
在这种情况下,瓶颈来自我的虚拟机,它们都在同一个管理程序上。
在确定性能瓶颈时,您可以检查的地方是 CPU、磁盘 I/O、网络带宽和内存。在您的情况下,网络和内存似乎可以排除。
我建议你使用一些系统监控软件,如dstat,在每个节点上监控上述四个因素的指标。
如果 所有节点 的 CPU 使用率接近 100%,那么这可能只是您的算法问题,您可能想检查是否可以优化它.
如果 CPU 使用率低,那么我会检查您的文件系统的 I/O 容量。您对未压缩数据运行速度较慢的测试可能是由于更高的 I/O 成本。实际上,大多数人压缩数据是为了性能:解压成本大多低于I/O成本。
如果您发现节点间的资源使用不平衡,则您的数据可能存在偏差。尽管每个节点都可以访问共享存储,但数据仍然具有局部性。您可能会看到某些节点比其对等节点工作时间更长或网络移动过多(如果数据分布良好,您可能会看到很少的网络移动)。
将输入存储在一个单一的大文件中通常不是一个好主意,尤其是在它被存档的情况下。计算的第一步始终是加载它并解压缩,这是顺序的。然后您可以利用反序列化数据的并行性,但要以网络流量为代价(这并不总是有回报)。
我建议将输入文件分成多个块,以便 Spark 可以并行读取它们。
在Spark中处理大型压缩文件时,我们需要考虑压缩文件的因素,如压缩文件的字节大小、压缩率、解压时间、压缩的内存需求和解压的内存需求。
文件大小(以字节为单位)和压缩比 -
Let's take a look on bz2 compressed file processing in spark, its ideal for processing in spark as file is split-able automatically, other advantage in terms of compression ratio is high compared to other compressions except gzip and lzma.
But problem with gzip when processing with spark main concern is its not split-able which will degrade processing time in spark. So compression wise bz2 is ideal for spark processing as it has very high compression ratio.
Where if we consider lzma compression its also better option compare to bz2 and split-able also. But only problem is with compression time is higher.
解压时间-
when it comes to decompression time lzma, it has very fast decompression speed compared to bz2. While others are slower compared to these two.
压缩和解压的内存要求-
where if we compare the memory requirement for compression and decompression for bz4, its less compared to other compression format except gzip.
您可以在下面link,
找到不同压缩格式的基准测试信息从上面的分析来看,哪一个是最好的压缩取决于用户的需要如果he/she准备在处理时间上妥协并寻找高压缩那么它he/she可以检查更高的压缩格式。
虽然如果用户可以在 storage/memory space 上妥协并希望通过 put he/she 进行更多处理,则可以选择压缩和解压缩速度更快的格式。
But on my opinion about storage/memory space is not an issue when you work on distributed system like spark. Processing through put is more important factor user are more focused and less worried about storage/memory space. Also whenever in spark try to process compressed file it will try to un-compress it first and then process it and if decompression is take more time that would be performance.