如何使用结构化流检查点管理 HDFS 内存
How to manage HDFS memory with Structured Streaming Checkpoints
我有一个很长的 运行ning 结构化流作业,它使用多个 Kafka 主题并通过滑动 window 聚合。我需要了解检查点在 HDFS 中是如何 managed/cleaned 的。
Jobs 运行 很好,我能够从失败的步骤中恢复而没有数据丢失,但是,我可以看到 HDFS 利用率每天都在增加。我找不到任何关于 Spark manages/cleans 如何建立检查点的文档。以前检查点存储在 s3 上,但事实证明这是相当昂贵的,因为大量的小文件 read/written。
query = formatted_stream.writeStream \
.format("kafka") \
.outputMode(output_mode) \
.option("kafka.bootstrap.servers", bootstrap_servers) \
.option("checkpointLocation", "hdfs:///path_to_checkpoints") \
.start()
据我了解,检查点应该自动清理;几天后,我只看到我的 HDFS 利用率呈线性增长。我如何确保检查点得到管理并且 HDFS 不会 运行 超出 space?
的已接受答案告知结构化流式处理应该处理此问题,但不是如何或如何配置它。
正如您在 the code for Checkpoint.scala 中看到的那样,检查点机制保留了最后 10 个检查点数据,但是几天后这应该不是问题。
一个常见的原因是你持久化在磁盘上的 RDDs 也随着时间线性增长。这可能是由于某些您不关心持久化的 RDD。
您需要确保在使用结构化流时没有需要持久化的增长 RDD。例如,如果你想计算数据集一列上不同元素的精确计数,你需要知道完整的输入数据(这意味着持久化随时间线性增加的数据,如果你每批数据不断涌入).相反,如果您可以使用近似计数,则可以使用诸如 HyperLogLog++ 之类的算法,这种算法通常需要更少的内存来权衡精度。
请记住,如果您使用的是 Spark SQL,您可能需要进一步检查优化查询的结果,因为这可能与 Catalyst 优化查询的方式有关。如果您不是,那么 Catalyst 可能会为您优化您的查询。
在任何情况下,进一步思考:如果检查点的使用量随着时间的推移而增加,这应该反映在您的流式处理作业也随着时间线性消耗更多的 RAM,因为检查点只是 Spark 上下文的序列化(加上固定大小的元数据)。如果是这种情况,请检查 SO 是否存在相关问题,例如 why does memory usage of Spark Worker increase with time?.
此外,对你调用 .persist()
的 RDD 有意义(以及缓存级别,以便你可以将元数据写入磁盘 RDD,并且一次只将它们部分加载到 Spark 上下文中)。
我有一个很长的 运行ning 结构化流作业,它使用多个 Kafka 主题并通过滑动 window 聚合。我需要了解检查点在 HDFS 中是如何 managed/cleaned 的。
Jobs 运行 很好,我能够从失败的步骤中恢复而没有数据丢失,但是,我可以看到 HDFS 利用率每天都在增加。我找不到任何关于 Spark manages/cleans 如何建立检查点的文档。以前检查点存储在 s3 上,但事实证明这是相当昂贵的,因为大量的小文件 read/written。
query = formatted_stream.writeStream \
.format("kafka") \
.outputMode(output_mode) \
.option("kafka.bootstrap.servers", bootstrap_servers) \
.option("checkpointLocation", "hdfs:///path_to_checkpoints") \
.start()
据我了解,检查点应该自动清理;几天后,我只看到我的 HDFS 利用率呈线性增长。我如何确保检查点得到管理并且 HDFS 不会 运行 超出 space?
正如您在 the code for Checkpoint.scala 中看到的那样,检查点机制保留了最后 10 个检查点数据,但是几天后这应该不是问题。
一个常见的原因是你持久化在磁盘上的 RDDs 也随着时间线性增长。这可能是由于某些您不关心持久化的 RDD。
您需要确保在使用结构化流时没有需要持久化的增长 RDD。例如,如果你想计算数据集一列上不同元素的精确计数,你需要知道完整的输入数据(这意味着持久化随时间线性增加的数据,如果你每批数据不断涌入).相反,如果您可以使用近似计数,则可以使用诸如 HyperLogLog++ 之类的算法,这种算法通常需要更少的内存来权衡精度。
请记住,如果您使用的是 Spark SQL,您可能需要进一步检查优化查询的结果,因为这可能与 Catalyst 优化查询的方式有关。如果您不是,那么 Catalyst 可能会为您优化您的查询。
在任何情况下,进一步思考:如果检查点的使用量随着时间的推移而增加,这应该反映在您的流式处理作业也随着时间线性消耗更多的 RAM,因为检查点只是 Spark 上下文的序列化(加上固定大小的元数据)。如果是这种情况,请检查 SO 是否存在相关问题,例如 why does memory usage of Spark Worker increase with time?.
此外,对你调用 .persist()
的 RDD 有意义(以及缓存级别,以便你可以将元数据写入磁盘 RDD,并且一次只将它们部分加载到 Spark 上下文中)。