如何处理 spark structured streaming 中的小文件问题?
How to handle small file problem in spark structured streaming?
我的项目中有一个场景,我正在使用 spark-sql-2.4.1 版本阅读 kafka 主题消息。我能够使用结构化流处理这一天。收到数据并经过处理后,我需要将数据保存到 hdfs 存储中的相应镶木地板文件中。
我能够存储和读取镶木地板文件,我将触发时间保持在 15 秒到 1 分钟之间。这些文件非常小,因此导致文件很多。
这些镶木地板文件需要稍后通过配置单元查询来读取。
所以
1)这个策略在生产环境中有效吗?还是以后会导致任何小文件问题?
2) handle/design 这种场景(即行业标准)的最佳实践是什么?
3) 生产中一般是怎么处理这些事情的?
谢谢。
我们也遇到了类似的问题。在大量谷歌搜索之后,似乎普遍接受的方法是编写另一个作业,该作业经常聚合许多小文件并将它们写入其他地方的更大、合并的文件中。这就是我们现在所做的。
顺便说一句:无论如何,您在这里可以做的事情是有限制的,因为您拥有的并行性越多,文件的数量就越多,因为每个执行程序线程都会写入自己的文件。他们从不写入共享文件。这似乎是并行处理的野兽的本性。
我知道这个问题太老了。我有类似的问题,我已经使用 spark 结构化流式查询侦听器来解决这个问题。
我的用例是从 kafka 获取数据并存储在具有年、月、日和小时分区的 hdfs 中。
下面的代码将获取前一小时的分区数据,应用重新分区并覆盖现有分区中的数据。
val session = SparkSession.builder().master("local[2]").enableHiveSupport().getOrCreate()
session.streams.addListener(AppListener(config,session))
class AppListener(config: Config,spark: SparkSession) extends StreamingQueryListener {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
this.synchronized {AppListener.mergeFiles(event.progress.timestamp,spark,config)}
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
}
object AppListener {
def mergeFiles(currentTs: String,spark: SparkSession,config:Config):Unit = {
val configs = config.kafka(config.key.get)
if(currentTs.datetime.isAfter(Processed.ts.plusMinutes(5))) {
println(
s"""
|Current Timestamp : ${currentTs}
|Merge Files : ${Processed.ts.minusHours(1)}
|
|""".stripMargin)
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val ts = Processed.ts.minusHours(1)
val hdfsPath = s"${configs.hdfsLocation}/year=${ts.getYear}/month=${ts.getMonthOfYear}/day=${ts.getDayOfMonth}/hour=${ts.getHourOfDay}"
val path = new Path(hdfsPath)
if(fs.exists(path)) {
val hdfsFiles = fs.listLocatedStatus(path)
.filter(lfs => lfs.isFile && !lfs.getPath.getName.contains("_SUCCESS"))
.map(_.getPath).toList
println(
s"""
|Total files in HDFS location : ${hdfsFiles.length}
| ${hdfsFiles.length > 1}
|""".stripMargin)
if(hdfsFiles.length > 1) {
println(
s"""
|Merge Small Files
|==============================================
|HDFS Path : ${hdfsPath}
|Total Available files : ${hdfsFiles.length}
|Status : Running
|
|""".stripMargin)
val df = spark.read.format(configs.writeFormat).load(hdfsPath).cache()
df.repartition(1)
.write
.format(configs.writeFormat)
.mode("overwrite")
.save(s"/tmp${hdfsPath}")
df.cache().unpersist()
spark
.read
.format(configs.writeFormat)
.load(s"/tmp${hdfsPath}")
.write
.format(configs.writeFormat)
.mode("overwrite")
.save(hdfsPath)
Processed.ts = Processed.ts.plusHours(1).toDateTime("yyyy-MM-dd'T'HH:00:00")
println(
s"""
|Merge Small Files
|==============================================
|HDFS Path : ${hdfsPath}
|Total files : ${hdfsFiles.length}
|Status : Completed
|
|""".stripMargin)
}
}
}
}
def apply(config: Config,spark: SparkSession): AppListener = new AppListener(config,spark)
}
object Processed {
var ts: DateTime = DateTime.now(DateTimeZone.forID("UTC")).toDateTime("yyyy-MM-dd'T'HH:00:00")
}
有时数据很大,我使用以下逻辑将数据分成多个文件。文件大小约为 160 MB
val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
val dataSize = bytes.toLong
val numPartitions = (bytes.toLong./(1024.0)./(1024.0)./(10240)).ceil.toInt
df.repartition(if(numPartitions == 0) 1 else numPartitions)
.[...]
编辑-1
使用这个 - spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes 一旦数据帧加载到内存中,我们就可以获得实际数据帧的大小,因为例如,您可以查看以下代码。
scala> val df = spark.read.format("orc").load("/tmp/srinivas/")
df: org.apache.spark.sql.DataFrame = [channelGrouping: string, clientId: string ... 75 more fields]
scala> import org.apache.commons.io.FileUtils
import org.apache.commons.io.FileUtils
scala> val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
bytes: BigInt = 763275709
scala> FileUtils.byteCountToDisplaySize(bytes.toLong)
res5: String = 727 MB
scala> import sys.process._
import sys.process._
scala> "hdfs dfs -ls -h /tmp/srinivas/".!
Found 2 items
-rw-r----- 3 svcmxns hdfs 0 2020-04-20 01:46 /tmp/srinivas/_SUCCESS
-rw-r----- 3 svcmxns hdfs 727.4 M 2020-04-20 01:46 /tmp/srinivas/part-00000-9d0b72ea-f617-4092-ae27-d36400c17917-c000.snappy.orc
res6: Int = 0
这是火花流的常见热点问题,没有固定答案。
我采用了一种基于追加思想的非常规方法。
由于您使用的是 spark 2.4.1,此解决方案将有所帮助。
因此,如果像 parquet 或 orc 这样的列式文件格式支持追加,那会更容易,因为新数据可以追加到同一个文件中,并且文件大小会在每个微批处理后变得越来越大.
但是,由于不受支持,我采用了版本控制方法来实现这一点。在每个微批次之后,都会使用版本分区生成数据。
例如
/prod/mobility/cdr_data/date=01–01–2010/version=12345/file1.parquet
/prod/mobility/cdr_data/date=01–01–2010/version=23456/file1.parquet
我们可以做的是,在每个微批次中,读取旧版本数据,将其与新流数据合并,并在与新版本相同的路径上再次写入。然后,删除旧版本。这样每一个微批之后,每个分区都会有一个版本和一个文件。每个分区中的文件大小将不断增长并变大。
由于不允许流数据集和静态数据集的并集,我们可以使用forEachBatch sink(在spark >=2.4.0中可用)将流数据集转换为静态数据集。
我已经在 link 中描述了如何以最佳方式实现这一点。你可能想看看。
https://medium.com/@kumar.rahul.nitk/solving-small-file-problem-in-spark-structured-streaming-a-versioning-approach-73a0153a0a
我的项目中有一个场景,我正在使用 spark-sql-2.4.1 版本阅读 kafka 主题消息。我能够使用结构化流处理这一天。收到数据并经过处理后,我需要将数据保存到 hdfs 存储中的相应镶木地板文件中。
我能够存储和读取镶木地板文件,我将触发时间保持在 15 秒到 1 分钟之间。这些文件非常小,因此导致文件很多。
这些镶木地板文件需要稍后通过配置单元查询来读取。
所以 1)这个策略在生产环境中有效吗?还是以后会导致任何小文件问题?
2) handle/design 这种场景(即行业标准)的最佳实践是什么?
3) 生产中一般是怎么处理这些事情的?
谢谢。
我们也遇到了类似的问题。在大量谷歌搜索之后,似乎普遍接受的方法是编写另一个作业,该作业经常聚合许多小文件并将它们写入其他地方的更大、合并的文件中。这就是我们现在所做的。
顺便说一句:无论如何,您在这里可以做的事情是有限制的,因为您拥有的并行性越多,文件的数量就越多,因为每个执行程序线程都会写入自己的文件。他们从不写入共享文件。这似乎是并行处理的野兽的本性。
我知道这个问题太老了。我有类似的问题,我已经使用 spark 结构化流式查询侦听器来解决这个问题。
我的用例是从 kafka 获取数据并存储在具有年、月、日和小时分区的 hdfs 中。
下面的代码将获取前一小时的分区数据,应用重新分区并覆盖现有分区中的数据。
val session = SparkSession.builder().master("local[2]").enableHiveSupport().getOrCreate()
session.streams.addListener(AppListener(config,session))
class AppListener(config: Config,spark: SparkSession) extends StreamingQueryListener {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
this.synchronized {AppListener.mergeFiles(event.progress.timestamp,spark,config)}
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
}
object AppListener {
def mergeFiles(currentTs: String,spark: SparkSession,config:Config):Unit = {
val configs = config.kafka(config.key.get)
if(currentTs.datetime.isAfter(Processed.ts.plusMinutes(5))) {
println(
s"""
|Current Timestamp : ${currentTs}
|Merge Files : ${Processed.ts.minusHours(1)}
|
|""".stripMargin)
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val ts = Processed.ts.minusHours(1)
val hdfsPath = s"${configs.hdfsLocation}/year=${ts.getYear}/month=${ts.getMonthOfYear}/day=${ts.getDayOfMonth}/hour=${ts.getHourOfDay}"
val path = new Path(hdfsPath)
if(fs.exists(path)) {
val hdfsFiles = fs.listLocatedStatus(path)
.filter(lfs => lfs.isFile && !lfs.getPath.getName.contains("_SUCCESS"))
.map(_.getPath).toList
println(
s"""
|Total files in HDFS location : ${hdfsFiles.length}
| ${hdfsFiles.length > 1}
|""".stripMargin)
if(hdfsFiles.length > 1) {
println(
s"""
|Merge Small Files
|==============================================
|HDFS Path : ${hdfsPath}
|Total Available files : ${hdfsFiles.length}
|Status : Running
|
|""".stripMargin)
val df = spark.read.format(configs.writeFormat).load(hdfsPath).cache()
df.repartition(1)
.write
.format(configs.writeFormat)
.mode("overwrite")
.save(s"/tmp${hdfsPath}")
df.cache().unpersist()
spark
.read
.format(configs.writeFormat)
.load(s"/tmp${hdfsPath}")
.write
.format(configs.writeFormat)
.mode("overwrite")
.save(hdfsPath)
Processed.ts = Processed.ts.plusHours(1).toDateTime("yyyy-MM-dd'T'HH:00:00")
println(
s"""
|Merge Small Files
|==============================================
|HDFS Path : ${hdfsPath}
|Total files : ${hdfsFiles.length}
|Status : Completed
|
|""".stripMargin)
}
}
}
}
def apply(config: Config,spark: SparkSession): AppListener = new AppListener(config,spark)
}
object Processed {
var ts: DateTime = DateTime.now(DateTimeZone.forID("UTC")).toDateTime("yyyy-MM-dd'T'HH:00:00")
}
有时数据很大,我使用以下逻辑将数据分成多个文件。文件大小约为 160 MB
val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
val dataSize = bytes.toLong
val numPartitions = (bytes.toLong./(1024.0)./(1024.0)./(10240)).ceil.toInt
df.repartition(if(numPartitions == 0) 1 else numPartitions)
.[...]
编辑-1
使用这个 - spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes 一旦数据帧加载到内存中,我们就可以获得实际数据帧的大小,因为例如,您可以查看以下代码。
scala> val df = spark.read.format("orc").load("/tmp/srinivas/")
df: org.apache.spark.sql.DataFrame = [channelGrouping: string, clientId: string ... 75 more fields]
scala> import org.apache.commons.io.FileUtils
import org.apache.commons.io.FileUtils
scala> val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
bytes: BigInt = 763275709
scala> FileUtils.byteCountToDisplaySize(bytes.toLong)
res5: String = 727 MB
scala> import sys.process._
import sys.process._
scala> "hdfs dfs -ls -h /tmp/srinivas/".!
Found 2 items
-rw-r----- 3 svcmxns hdfs 0 2020-04-20 01:46 /tmp/srinivas/_SUCCESS
-rw-r----- 3 svcmxns hdfs 727.4 M 2020-04-20 01:46 /tmp/srinivas/part-00000-9d0b72ea-f617-4092-ae27-d36400c17917-c000.snappy.orc
res6: Int = 0
这是火花流的常见热点问题,没有固定答案。 我采用了一种基于追加思想的非常规方法。 由于您使用的是 spark 2.4.1,此解决方案将有所帮助。
因此,如果像 parquet 或 orc 这样的列式文件格式支持追加,那会更容易,因为新数据可以追加到同一个文件中,并且文件大小会在每个微批处理后变得越来越大. 但是,由于不受支持,我采用了版本控制方法来实现这一点。在每个微批次之后,都会使用版本分区生成数据。 例如
/prod/mobility/cdr_data/date=01–01–2010/version=12345/file1.parquet
/prod/mobility/cdr_data/date=01–01–2010/version=23456/file1.parquet
我们可以做的是,在每个微批次中,读取旧版本数据,将其与新流数据合并,并在与新版本相同的路径上再次写入。然后,删除旧版本。这样每一个微批之后,每个分区都会有一个版本和一个文件。每个分区中的文件大小将不断增长并变大。
由于不允许流数据集和静态数据集的并集,我们可以使用forEachBatch sink(在spark >=2.4.0中可用)将流数据集转换为静态数据集。
我已经在 link 中描述了如何以最佳方式实现这一点。你可能想看看。 https://medium.com/@kumar.rahul.nitk/solving-small-file-problem-in-spark-structured-streaming-a-versioning-approach-73a0153a0a