如何正确管理从 Spark Streaming 生成的分区镶木地板文件
How to do proper housekeeping of partitioned parquet files generated from Spark Streaming
我的 Spark 结构化流作业不断生成我想在到期后删除的 parquet 文件(假设 30 天后)。
我将我的 parquet 数据分区,分区键是 RFC3339/ISO8601 中的事件日期,这样可以根据 cron 作业在 HDFS 级别上相当容易地完成内务处理(删除所有 parquet-folders with partitionkey < oldestAllowedAge 在字符串比较方面)。
但是,自从我引入了 Spark Streaming,Spark 将元数据写入到一个名为 _spark_metadata
的文件夹中,该文件夹位于要写入的数据本身旁边。如果我现在只删除过期的 HDFS 文件和 运行 整个数据集上的 spark 批处理作业,该作业将因找不到文件而失败。批处理作业将读取元数据并期望已删除的文件存在。
对此的简单解决方案是仅禁用 _spark_metadata
目录的创建,如此处所述:。但由于我不想在读取常规批处理分析的数据时失去性能,我想知道是否有更好的解决方案。
我想,我可以只使用 spark 进行删除,以便它删除 parquet hdfs 文件 AND 更新元数据。但是,只需执行
session.sql(String.format("DELETE FROM parquet.`%s` WHERE partitionKey < " + oldestAllowedPartitionAge, path.toString()));
不起作用。 DELETE
遗憾的是,Spark 中的操作不受支持...
有什么解决方案可以让我删除旧数据但 _spark_metadata
文件夹仍然有效吗?
据我了解,_spark_metadata
的主要目的是确保容错并避免列出所有要处理的文件:
In order to correctly handle partial failures while maintaining
exactly once semantics, the files for each batch are written out to a
unique directory and then atomically appended to a metadata log. When
a parquet based DataSource
is initialized for reading, we first
check for this log directory and use it instead of file listing when
present.
https://github.com/apache/spark/commit/6bc4be64f86afcb38e4444c80c9400b7b6b745de
你引用的 link () 解释说问题来自不一致的检查点状态 - 检查点生成元数据但后来用户手动删除它并且当他重新启动查询时,它失败了因为检查点应该有元数据文件。
要查看缺少元数据是否会导致批处理失败,请查看 org.apache.spark.sql.execution.datasources.DataSource#resolveRelation 方法,您可以在其中找到与 2 个案例匹配的模式:
// We are reading from the results of a streaming query. Load files from the metadata log
// instead of listing them using HDFS APIs.
case (format: FileFormat, _)
if FileStreamSink.hasMetadata(
caseInsensitiveOptions.get("path").toSeq ++ paths,
sparkSession.sessionState.newHadoopConf()) =>
case (format: FileFormat, _) =>
val globbedPaths =
checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
和 hasMetadata
方法如下所示:
def hasMetadata(path: Seq[String], hadoopConf: Configuration): Boolean = {
path match {
case Seq(singlePath) =>
try {
val hdfsPath = new Path(singlePath)
val fs = hdfsPath.getFileSystem(hadoopConf)
if (fs.isDirectory(hdfsPath)) {
fs.exists(new Path(hdfsPath, metadataDir))
} else {
false
}
} catch {
case NonFatal(e) =>
logWarning(s"Error while looking for metadata directory.")
false
}
case _ => false
}
}
如您所见,没有失败的风险(至少通过阅读代码!)。如果您有一些,请提供更多上下文,因为问题可能出在其他地方。
关于您的性能问题,此 _spark_metadata
仅包含文件列表,因此当然,Spark 将首先需要列出您输入目录中的文件。但根据我的经验,这并不是成本最高的操作。例如,在 AWS S3 上列出包含 1297 个文件的目录大约需要 9 秒。之后,由您决定是要进行简单的清洁过程还是稍微慢一些的批处理。如果你有更多这样的文件,也许你也应该将它们分成更大的文件,比如 256 MB 或更多?
不过,如果您想保留 _spark_metadata
,也许有一种方法可以通过清理应用程序删除文件。但这将具有挑战性,因为您将有 2 个应用程序(流式处理和清理)处理相同的数据。
您可以在此处找到有关 _spark_metadata
的更多信息:
这实际上是结构化流式传输 (SPARK-24295) 中的已知问题之一,尽管它只发生在大量输入文件中,并且最终用户正在采取自己的解决方法。例如,停止查询 -> 删除旧的输入文件 -> 手动操作元数据以清除它们 -> 重新启动查询。
鉴于手动操作元数据并非微不足道且不理想(假设它应该停止流式查询,并强制最终用户了解元数据的格式),SPARK-27188 被提议作为替代方案 - 它应用保留和从元数据中清除过时的输入文件。
据我所知,这个有三个选项可以解决这个问题:
1) 使用spark.load(filePathsUsingGlobRegex)
只加载需要读取的文件,这样spark不需要加载所有文件,因此不需要spark_metadata.
优点:你仍然可以获得spark_metadata的好处(读取速度更快,exactly-once语义仍然得到保证)
缺点:您必须自己构建文件的路径,如果您的数据存储在各种分区策略中,这可能会更加混乱。
2) 不要在输出目录
中创建spark_metadata
优点:清理很简单
缺点:你失去了spark_metadata的好处。
3) 在升级时了解并更新 spark_metadata 文件,删除旧文件。
优点:您同时拥有保留工作和 spark_metadata 好处。
缺点:您必须手动更改 _spark_metadata,它可以是 tough/messy 代码来维护。鉴于这是内部的火花并且可以改变。
我的 Spark 结构化流作业不断生成我想在到期后删除的 parquet 文件(假设 30 天后)。
我将我的 parquet 数据分区,分区键是 RFC3339/ISO8601 中的事件日期,这样可以根据 cron 作业在 HDFS 级别上相当容易地完成内务处理(删除所有 parquet-folders with partitionkey < oldestAllowedAge 在字符串比较方面)。
但是,自从我引入了 Spark Streaming,Spark 将元数据写入到一个名为 _spark_metadata
的文件夹中,该文件夹位于要写入的数据本身旁边。如果我现在只删除过期的 HDFS 文件和 运行 整个数据集上的 spark 批处理作业,该作业将因找不到文件而失败。批处理作业将读取元数据并期望已删除的文件存在。
对此的简单解决方案是仅禁用 _spark_metadata
目录的创建,如此处所述:
我想,我可以只使用 spark 进行删除,以便它删除 parquet hdfs 文件 AND 更新元数据。但是,只需执行
session.sql(String.format("DELETE FROM parquet.`%s` WHERE partitionKey < " + oldestAllowedPartitionAge, path.toString()));
不起作用。 DELETE
遗憾的是,Spark 中的操作不受支持...
有什么解决方案可以让我删除旧数据但 _spark_metadata
文件夹仍然有效吗?
据我了解,_spark_metadata
的主要目的是确保容错并避免列出所有要处理的文件:
In order to correctly handle partial failures while maintaining exactly once semantics, the files for each batch are written out to a unique directory and then atomically appended to a metadata log. When a parquet based
DataSource
is initialized for reading, we first check for this log directory and use it instead of file listing when present.
https://github.com/apache/spark/commit/6bc4be64f86afcb38e4444c80c9400b7b6b745de
你引用的 link (
要查看缺少元数据是否会导致批处理失败,请查看 org.apache.spark.sql.execution.datasources.DataSource#resolveRelation 方法,您可以在其中找到与 2 个案例匹配的模式:
// We are reading from the results of a streaming query. Load files from the metadata log
// instead of listing them using HDFS APIs.
case (format: FileFormat, _)
if FileStreamSink.hasMetadata(
caseInsensitiveOptions.get("path").toSeq ++ paths,
sparkSession.sessionState.newHadoopConf()) =>
case (format: FileFormat, _) =>
val globbedPaths =
checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
和 hasMetadata
方法如下所示:
def hasMetadata(path: Seq[String], hadoopConf: Configuration): Boolean = {
path match {
case Seq(singlePath) =>
try {
val hdfsPath = new Path(singlePath)
val fs = hdfsPath.getFileSystem(hadoopConf)
if (fs.isDirectory(hdfsPath)) {
fs.exists(new Path(hdfsPath, metadataDir))
} else {
false
}
} catch {
case NonFatal(e) =>
logWarning(s"Error while looking for metadata directory.")
false
}
case _ => false
}
}
如您所见,没有失败的风险(至少通过阅读代码!)。如果您有一些,请提供更多上下文,因为问题可能出在其他地方。
关于您的性能问题,此 _spark_metadata
仅包含文件列表,因此当然,Spark 将首先需要列出您输入目录中的文件。但根据我的经验,这并不是成本最高的操作。例如,在 AWS S3 上列出包含 1297 个文件的目录大约需要 9 秒。之后,由您决定是要进行简单的清洁过程还是稍微慢一些的批处理。如果你有更多这样的文件,也许你也应该将它们分成更大的文件,比如 256 MB 或更多?
不过,如果您想保留 _spark_metadata
,也许有一种方法可以通过清理应用程序删除文件。但这将具有挑战性,因为您将有 2 个应用程序(流式处理和清理)处理相同的数据。
您可以在此处找到有关 _spark_metadata
的更多信息:
这实际上是结构化流式传输 (SPARK-24295) 中的已知问题之一,尽管它只发生在大量输入文件中,并且最终用户正在采取自己的解决方法。例如,停止查询 -> 删除旧的输入文件 -> 手动操作元数据以清除它们 -> 重新启动查询。
鉴于手动操作元数据并非微不足道且不理想(假设它应该停止流式查询,并强制最终用户了解元数据的格式),SPARK-27188 被提议作为替代方案 - 它应用保留和从元数据中清除过时的输入文件。
据我所知,这个有三个选项可以解决这个问题:
1) 使用spark.load(filePathsUsingGlobRegex)
只加载需要读取的文件,这样spark不需要加载所有文件,因此不需要spark_metadata.
优点:你仍然可以获得spark_metadata的好处(读取速度更快,exactly-once语义仍然得到保证)
缺点:您必须自己构建文件的路径,如果您的数据存储在各种分区策略中,这可能会更加混乱。
2) 不要在输出目录
优点:清理很简单
缺点:你失去了spark_metadata的好处。
3) 在升级时了解并更新 spark_metadata 文件,删除旧文件。
优点:您同时拥有保留工作和 spark_metadata 好处。
缺点:您必须手动更改 _spark_metadata,它可以是 tough/messy 代码来维护。鉴于这是内部的火花并且可以改变。