Spark on Cluster:读取大量小的 avro 文件需要很长时间才能列出
Spark on Cluster: Read Large number of small avro files is taking too long to list
我知道这个在HDFS中读取大量小文件的问题一直是一个问题并且被广泛讨论,但请耐心等待。大多数处理此类问题的 Whosebug 问题都与读取大量 txt 有关 files.I'm trying to read a large number of small avro files
加上这些读取 txt 文件的解决方案讨论了使用 WholeTextFileInputFormat 或 CombineInputFormat (),它们是 RDD 实现,我使用的是 Spark 2.4 (HDFS 3.0.0),RDD 实现通常是不鼓励的,数据帧是首选。我更喜欢使用数据帧,但也对 RDD 实现持开放态度。
我已经尝试按照 Murtaza 的建议合并数据帧,但是在大量文件上出现 OOM 错误 ()
我正在使用以下代码
val filePaths = avroConsolidator.getFilesInDateRangeWithExtension //pattern:filePaths: Array[String]
//I do need to create a list of file paths as I need to filter files based on file names. Need this logic for some upstream process
//example : Array("hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1530.avro","hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1531.avro","hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1532.avro")
val df_mid = sc.read.format("com.databricks.spark.avro").load(filePaths: _*)
val df = df_mid
.withColumn("dt", date_format(df_mid.col("timeStamp"), "yyyy-MM-dd"))
.filter("dt != 'null'")
df
.repartition(partitionColumns(inputs.logSubType).map(new org.apache.spark.sql.Column(_)):_*)
.write.partitionBy(partitionColumns(inputs.logSubType): _*)
.mode(SaveMode.Append)
.option("compression","snappy")
.parquet(avroConsolidator.parquetFilePath.toString)
在作业级别列出 183 个小文件用了 1.6 分钟
奇怪的是我的舞台 UI 页面只显示 3s(不明白为什么)
avro 文件存储在 yyyy/mm/dd 个分区中:hdfs://server123:8020/source/Avro/weblog/2019/06/03
有什么方法可以加快叶文件的列表,正如您从屏幕截图中看到的那样,只需 6 秒即可整合到镶木地板文件中,但列出文件需要 1.3 分钟
由于读取大量小文件的时间太长,我退后一步,使用 CombineFileInputFormat 创建 RDD。 This InputFormat 适用于小文件,因为它将许多小文件打包成一个分割,因此映射器更少,每个映射器有更多数据要处理。
这是我所做的:
def createDataFrame(filePaths: Array[Path], sc: SparkSession, inputs: AvroConsolidatorInputs): DataFrame = {
val job: Job = Job.getInstance(sc.sparkContext.hadoopConfiguration)
FileInputFormat.setInputPaths(job, filePaths: _*)
val sqlType = SchemaConverters.toSqlType(getSchema(inputs.logSubType))
val rddKV = sc.sparkContext.newAPIHadoopRDD(
job.getConfiguration,
classOf[CombinedAvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]],
classOf[NullWritable])
val rowRDD = rddKV.mapPartitions(
f = (iter: Iterator[(AvroKey[GenericRecord], NullWritable)]) =>
iter.map(_._1.datum()).map(genericRecordToRow(_, sqlType))
, preservesPartitioning = true)
val df = sc.sqlContext.createDataFrame(rowRDD ,
sqlType.dataType.asInstanceOf[StructType])
df
CombinedAvroKeyInputFormat 是用户定义的 class,它扩展了 CombineFileInputFormat 并将 64MB 的数据放入单个拆分中。
object CombinedAvroKeyInputFormat {
class CombinedAvroKeyRecordReader[T](var inputSplit: CombineFileSplit, context: TaskAttemptContext, idx: Integer)
extends AvroKeyRecordReader[T](AvroJob.getInputKeySchema(context.getConfiguration))
{
@throws[IOException]
@throws[InterruptedException]
override def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit = {
this.inputSplit = inputSplit.asInstanceOf[CombineFileSplit]
val fileSplit = new FileSplit(this.inputSplit.getPath(idx),
this.inputSplit.getOffset(idx),
this.inputSplit.getLength(idx),
this.inputSplit.getLocations)
super.initialize(fileSplit, context)
}
}
}
/*
* The class CombineFileInputFormat is an abstract class with no implementation, so we must create a subclass to support it;
* We’ll name the subclass CombinedAvroKeyInputFormat. The subclass will initiate a delegate CombinedAvroKeyRecordReader that extends AvroKeyRecordReader
*/
class CombinedAvroKeyInputFormat[T] extends CombineFileInputFormat[AvroKey[T], NullWritable] {
val logger = Logger.getLogger(AvroConsolidator.getClass)
setMaxSplitSize(67108864)
def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[AvroKey[T], NullWritable] = {
val c = classOf[CombinedAvroKeyInputFormat.CombinedAvroKeyRecordReader[_]]
val inputSplit = split.asInstanceOf[CombineFileSplit]
/*
* CombineFileRecordReader is a built in class that pass each split to our class CombinedAvroKeyRecordReader
* When the hadoop job starts, CombineFileRecordReader reads all the file sizes in HDFS that we want it to process,
* and decides how many splits base on the MaxSplitSize
*/
return new CombineFileRecordReader[AvroKey[T], NullWritable](
inputSplit,
context,
c.asInstanceOf[Class[_ <: RecordReader[AvroKey[T], NullWritable]]])
}
}
这使得小文件的读取速度更快
我在从 AWS S3 读取 100 多个小 avro 文件时遇到了类似的问题:
spark.read.format("avro").load(<file_directory_path_containing_many_avro_files>)
在完成大部分计划任务后,作业会在不同的时间点挂起。例如,它将 运行 在 25 秒内快速完成 111 项任务中的 110 项,并在 110 处挂起一次,而在下一次尝试时,它将在 111 项任务中的第 98 项任务处挂起。它没有通过挂点。
此处引用spark配置指南:
虽然不能解决挂起的最初原因,但下面的 spark 配置被证明是一种快速修复和解决方法。
将 spark.speculation
设置为 true 解决了这个问题。
我知道这个在HDFS中读取大量小文件的问题一直是一个问题并且被广泛讨论,但请耐心等待。大多数处理此类问题的 Whosebug 问题都与读取大量 txt 有关 files.I'm trying to read a large number of small avro files
加上这些读取 txt 文件的解决方案讨论了使用 WholeTextFileInputFormat 或 CombineInputFormat (),它们是 RDD 实现,我使用的是 Spark 2.4 (HDFS 3.0.0),RDD 实现通常是不鼓励的,数据帧是首选。我更喜欢使用数据帧,但也对 RDD 实现持开放态度。
我已经尝试按照 Murtaza 的建议合并数据帧,但是在大量文件上出现 OOM 错误 ()
我正在使用以下代码
val filePaths = avroConsolidator.getFilesInDateRangeWithExtension //pattern:filePaths: Array[String]
//I do need to create a list of file paths as I need to filter files based on file names. Need this logic for some upstream process
//example : Array("hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1530.avro","hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1531.avro","hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1532.avro")
val df_mid = sc.read.format("com.databricks.spark.avro").load(filePaths: _*)
val df = df_mid
.withColumn("dt", date_format(df_mid.col("timeStamp"), "yyyy-MM-dd"))
.filter("dt != 'null'")
df
.repartition(partitionColumns(inputs.logSubType).map(new org.apache.spark.sql.Column(_)):_*)
.write.partitionBy(partitionColumns(inputs.logSubType): _*)
.mode(SaveMode.Append)
.option("compression","snappy")
.parquet(avroConsolidator.parquetFilePath.toString)
在作业级别列出 183 个小文件用了 1.6 分钟
奇怪的是我的舞台 UI 页面只显示 3s(不明白为什么)
avro 文件存储在 yyyy/mm/dd 个分区中:hdfs://server123:8020/source/Avro/weblog/2019/06/03
有什么方法可以加快叶文件的列表,正如您从屏幕截图中看到的那样,只需 6 秒即可整合到镶木地板文件中,但列出文件需要 1.3 分钟
由于读取大量小文件的时间太长,我退后一步,使用 CombineFileInputFormat 创建 RDD。 This InputFormat 适用于小文件,因为它将许多小文件打包成一个分割,因此映射器更少,每个映射器有更多数据要处理。
这是我所做的:
def createDataFrame(filePaths: Array[Path], sc: SparkSession, inputs: AvroConsolidatorInputs): DataFrame = {
val job: Job = Job.getInstance(sc.sparkContext.hadoopConfiguration)
FileInputFormat.setInputPaths(job, filePaths: _*)
val sqlType = SchemaConverters.toSqlType(getSchema(inputs.logSubType))
val rddKV = sc.sparkContext.newAPIHadoopRDD(
job.getConfiguration,
classOf[CombinedAvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]],
classOf[NullWritable])
val rowRDD = rddKV.mapPartitions(
f = (iter: Iterator[(AvroKey[GenericRecord], NullWritable)]) =>
iter.map(_._1.datum()).map(genericRecordToRow(_, sqlType))
, preservesPartitioning = true)
val df = sc.sqlContext.createDataFrame(rowRDD ,
sqlType.dataType.asInstanceOf[StructType])
df
CombinedAvroKeyInputFormat 是用户定义的 class,它扩展了 CombineFileInputFormat 并将 64MB 的数据放入单个拆分中。
object CombinedAvroKeyInputFormat {
class CombinedAvroKeyRecordReader[T](var inputSplit: CombineFileSplit, context: TaskAttemptContext, idx: Integer)
extends AvroKeyRecordReader[T](AvroJob.getInputKeySchema(context.getConfiguration))
{
@throws[IOException]
@throws[InterruptedException]
override def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit = {
this.inputSplit = inputSplit.asInstanceOf[CombineFileSplit]
val fileSplit = new FileSplit(this.inputSplit.getPath(idx),
this.inputSplit.getOffset(idx),
this.inputSplit.getLength(idx),
this.inputSplit.getLocations)
super.initialize(fileSplit, context)
}
}
}
/*
* The class CombineFileInputFormat is an abstract class with no implementation, so we must create a subclass to support it;
* We’ll name the subclass CombinedAvroKeyInputFormat. The subclass will initiate a delegate CombinedAvroKeyRecordReader that extends AvroKeyRecordReader
*/
class CombinedAvroKeyInputFormat[T] extends CombineFileInputFormat[AvroKey[T], NullWritable] {
val logger = Logger.getLogger(AvroConsolidator.getClass)
setMaxSplitSize(67108864)
def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[AvroKey[T], NullWritable] = {
val c = classOf[CombinedAvroKeyInputFormat.CombinedAvroKeyRecordReader[_]]
val inputSplit = split.asInstanceOf[CombineFileSplit]
/*
* CombineFileRecordReader is a built in class that pass each split to our class CombinedAvroKeyRecordReader
* When the hadoop job starts, CombineFileRecordReader reads all the file sizes in HDFS that we want it to process,
* and decides how many splits base on the MaxSplitSize
*/
return new CombineFileRecordReader[AvroKey[T], NullWritable](
inputSplit,
context,
c.asInstanceOf[Class[_ <: RecordReader[AvroKey[T], NullWritable]]])
}
}
这使得小文件的读取速度更快
我在从 AWS S3 读取 100 多个小 avro 文件时遇到了类似的问题:
spark.read.format("avro").load(<file_directory_path_containing_many_avro_files>)
在完成大部分计划任务后,作业会在不同的时间点挂起。例如,它将 运行 在 25 秒内快速完成 111 项任务中的 110 项,并在 110 处挂起一次,而在下一次尝试时,它将在 111 项任务中的第 98 项任务处挂起。它没有通过挂点。
此处引用spark配置指南:
虽然不能解决挂起的最初原因,但下面的 spark 配置被证明是一种快速修复和解决方法。
将 spark.speculation
设置为 true 解决了这个问题。