Spark on Cluster:读取大量小的 avro 文件需要很长时间才能列出

Spark on Cluster: Read Large number of small avro files is taking too long to list

我知道这个在HDFS中读取大量小文件的问题一直是一个问题并且被广泛讨论,但请耐心等待。大多数处理此类问题的 Whosebug 问题都与读取大量 txt 有关 files.I'm trying to read a large number of small avro files

加上这些读取 txt 文件的解决方案讨论了使用 WholeTextFileInputFormat 或 CombineInputFormat (),它们是 RDD 实现,我使用的是 Spark 2.4 (HDFS 3.0.0),RDD 实现通常是不鼓励的,数据帧是首选。我更喜欢使用数据帧,但也对 RDD 实现持开放态度。

我已经尝试按照 Murtaza 的建议合并数据帧,但是在大量文件上出现 OOM 错误 ()

我正在使用以下代码

val filePaths = avroConsolidator.getFilesInDateRangeWithExtension //pattern:filePaths: Array[String] 
//I do need to create a list of file paths as I need to filter files based on file names. Need this logic for some upstream process
//example : Array("hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1530.avro","hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1531.avro","hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1532.avro")
val df_mid = sc.read.format("com.databricks.spark.avro").load(filePaths: _*)
      val df = df_mid
        .withColumn("dt", date_format(df_mid.col("timeStamp"), "yyyy-MM-dd"))
        .filter("dt != 'null'")

      df
        .repartition(partitionColumns(inputs.logSubType).map(new org.apache.spark.sql.Column(_)):_*)
        .write.partitionBy(partitionColumns(inputs.logSubType): _*)
        .mode(SaveMode.Append)
        .option("compression","snappy")
        .parquet(avroConsolidator.parquetFilePath.toString)

在作业级别列出 183 个小文件用了 1.6 分钟

奇怪的是我的舞台 UI 页面只显示 3s(不明白为什么)

avro 文件存储在 yyyy/mm/dd 个分区中:hdfs://server123:8020/source/Avro/weblog/2019/06/03

有什么方法可以加快叶文件的列表,正如您从屏幕截图中看到的那样,只需 6 秒即可整合到镶木地板文件中,但列出文件需要 1.3 分钟

由于读取大量小文件的时间太长,我退后一步,使用 CombineFileInputFormat 创建 RDD。 This InputFormat 适用于小文件,因为它将许多小文件打包成一个分割,因此映射器更少,每个映射器有更多数据要处理。

这是我所做的:

def createDataFrame(filePaths: Array[Path], sc: SparkSession, inputs: AvroConsolidatorInputs): DataFrame = {

   val job: Job = Job.getInstance(sc.sparkContext.hadoopConfiguration)
   FileInputFormat.setInputPaths(job, filePaths: _*)
   val sqlType = SchemaConverters.toSqlType(getSchema(inputs.logSubType))

   val rddKV = sc.sparkContext.newAPIHadoopRDD(
                   job.getConfiguration,
                   classOf[CombinedAvroKeyInputFormat[GenericRecord]],
                   classOf[AvroKey[GenericRecord]],
                   classOf[NullWritable])

   val rowRDD = rddKV.mapPartitions(
                  f = (iter: Iterator[(AvroKey[GenericRecord], NullWritable)]) =>
                       iter.map(_._1.datum()).map(genericRecordToRow(_, sqlType))
                       , preservesPartitioning = true)

   val df = sc.sqlContext.createDataFrame(rowRDD , 
              sqlType.dataType.asInstanceOf[StructType])
   df

CombinedAvroKeyInputFormat 是用户定义的 class,它扩展了 CombineFileInputFormat 并将 64MB 的数据放入单个拆分中。

object CombinedAvroKeyInputFormat {

  class CombinedAvroKeyRecordReader[T](var inputSplit: CombineFileSplit, context: TaskAttemptContext, idx: Integer)
    extends AvroKeyRecordReader[T](AvroJob.getInputKeySchema(context.getConfiguration))
  {
    @throws[IOException]
    @throws[InterruptedException]
    override def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit = {
      this.inputSplit = inputSplit.asInstanceOf[CombineFileSplit]
      val fileSplit = new FileSplit(this.inputSplit.getPath(idx),
                                    this.inputSplit.getOffset(idx),
                                    this.inputSplit.getLength(idx),
                                    this.inputSplit.getLocations)
      super.initialize(fileSplit, context)
    }
  }

}

/*
 * The class CombineFileInputFormat is an abstract class with no implementation, so we must create a subclass to support it;
 * We’ll name the subclass CombinedAvroKeyInputFormat. The subclass will initiate a delegate CombinedAvroKeyRecordReader that extends AvroKeyRecordReader
 */

class CombinedAvroKeyInputFormat[T] extends CombineFileInputFormat[AvroKey[T], NullWritable] {
  val logger = Logger.getLogger(AvroConsolidator.getClass)
  setMaxSplitSize(67108864)
  def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[AvroKey[T], NullWritable] = {
    val c = classOf[CombinedAvroKeyInputFormat.CombinedAvroKeyRecordReader[_]]
    val inputSplit = split.asInstanceOf[CombineFileSplit]

    /*
     * CombineFileRecordReader is a built in class that pass each split to our class CombinedAvroKeyRecordReader
     * When the hadoop job starts, CombineFileRecordReader reads all the file sizes in HDFS that we want it to process,
     * and decides how many splits base on the MaxSplitSize
     */
    return new CombineFileRecordReader[AvroKey[T], NullWritable](
      inputSplit,
      context,
      c.asInstanceOf[Class[_ <: RecordReader[AvroKey[T], NullWritable]]])
  }
}

这使得小文件的读取速度更快

我在从 AWS S3 读取 100 多个小 avro 文件时遇到了类似的问题:

spark.read.format("avro").load(<file_directory_path_containing_many_avro_files>)

在完成大部分计划任务后,作业会在不同的时间点挂起。例如,它将 运行 在 25 秒内快速完成 111 项任务中的 110 项,并在 110 处挂起一次,而在下一次尝试时,它将在 111 项任务中的第 98 项任务处挂起。它没有通过挂点。

在此处阅读类似问题后:https://blog.yuvalitzchakov.com/leveraging-spark-speculation-to-identify-and-re-schedule-slow-running-tasks/

此处引用spark配置指南:

spark configuration guide

虽然不能解决挂起的最初原因,但下面的 spark 配置被证明是一种快速修复和解决方法。

spark.speculation 设置为 true 解决了这个问题。