如何在不重新分区和 copyMerge 的情况下合并 spark 结果文件?

How can I merge spark results files without repartition and copyMerge?

我使用下一个代码:

csv.saveAsTextFile(pathToResults, classOf[GzipCodec])

pathToResults 目录有很多文件,如 part-0000、part-0001 等。 我可以使用FileUtil.copyMerge(),但它真的很慢,它会下载驱动程序上的所有文件,然后将它们上传到hadoop 中。但是 FileUtil.copyMerge() 比

csv.repartition(1).saveAsTextFile(pathToResults, classOf[GzipCodec])

如何在不重新分区和 FileUtil.copyMerge() 的情况下合并 spark 结果文件?

不幸的是,没有其他选项可以在 Spark 中获取单个输出文件。您可以使用 coalesce(1) 而不是 repartition(1),但是使用参数 1 它们的行为是相同的。 Spark 会将您的数据收集在内存中的单个分区中,如果您的数据太大,可能会导致 OOM 错误。

在 HDFS 上合并文件的另一种选择可能是编写一个简单的 MapReduce 作业(或 Pig 作业或 Hadoop Streaming 作业),它将整个目录作为输入,并使用单个 reducer 生成单个输出文件.但请注意,使用 MapReduce 方法,所有数据将首先复制到 reducer 本地文件系统,这可能会导致 "out of space" 错误。

以下是关于同一主题的一些有用链接:

  • merge output files after reduce phase
  • Merging hdfs files
  • Merging multiple files into one within Hadoop

有完全相同的问题并且不得不编写实现 copyMerge 的 pySpark 代码(调用 Hadoop API):

https://github.com/Tagar/stuff/blob/master/copyMerge.py

不幸的是,copyMerge 作为独立的 Hadoop API 调用将在 Hadoop 3.0 中被弃用和删除。所以这个实现不依赖于Hadoop的copyMerge(它重新实现了它)。

coalesce(1) 工作正常。我还看到 hadoop-streaming 选项可以动态合并 HDFS 文件,如果你想 运行 这个脚本:

$ hadoop jar /usr/hdp/2.3.2.0-2950/hadoop-mapreduce/hadoop-streaming-2.7.1.2.3.2.0-2950.jar \
                   -Dmapred.reduce.tasks=1 \
                   -input "/hdfs/input/dir" \
                   -output "/hdfs/output/dir" \
                   -mapper cat \
                   -reducer cat