Spark 清理溢出到磁盘的随机播放
Spark clean up shuffle spilled to disk
我有一个循环操作,它生成一些 RDD,进行重新分区,然后进行 aggregatebykey 操作。循环运行s次后,计算出一个最终的RDD,缓存并checkpoint,也作为下一次循环的初始RDD。
这些 RDD 非常大,在每次迭代到达最终 RDD 之前会生成大量中间随机块。我正在压缩我的 shuffle 并允许 shuffle 溢出到磁盘。
我注意到我的工作机器上存储随机文件的工作目录没有被清理。因此最终我 运行 出磁盘 space。我的印象是,如果我检查我的 RDD,它会删除所有中间洗牌块。然而,这似乎没有发生。有没有人知道我如何在每次循环迭代后清理我的随机播放块,或者为什么我的随机播放块没有被清理?
一旦您将 RDD 缓存到您的 memory/disk,只要 spark 上下文存在,RDD 就会存储在您的 memory/disk。
为了告诉驱动程序它可以从 memory/disk 中删除 RDD,您需要使用 unpersist()
函数。
From the java-doc
:
/**
* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
*
* @param blocking Whether to block until all blocks are deleted.
* @return This RDD.
*/
def unpersist(blocking: Boolean = true)
所以你可以使用:
rdd.unpersist()
取决于您是否在这些 RDD 之间存在依赖关系。例如:
val rdd2 = rdd1.<transformation>
val rdd3 = rdd2.<transformation>
...
在这种情况下,spark 会记住沿袭,并且总会有对旧 RDD 的引用,这使得它不会被 spark driver 选择清理(spark rdd clean up is down by gc on spark driver回收一些不再引用的 rdd 引用)。
所以 persist() 在这种情况下不起作用,唯一的方法是使用 localCheckpoint()
。这是我以前做过的并且为我工作的:
rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
.localCheckpoint()
// do sth here and later
rdd.unpersist()
这使得 spark 正确截断沿袭,然后您可以安全地 unpersist() 它而不用担心未清理的引用。
请参考 spark doc 了解如何正确截断执行计划沿袭:https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-checkpointing.html
我有一个循环操作,它生成一些 RDD,进行重新分区,然后进行 aggregatebykey 操作。循环运行s次后,计算出一个最终的RDD,缓存并checkpoint,也作为下一次循环的初始RDD。
这些 RDD 非常大,在每次迭代到达最终 RDD 之前会生成大量中间随机块。我正在压缩我的 shuffle 并允许 shuffle 溢出到磁盘。
我注意到我的工作机器上存储随机文件的工作目录没有被清理。因此最终我 运行 出磁盘 space。我的印象是,如果我检查我的 RDD,它会删除所有中间洗牌块。然而,这似乎没有发生。有没有人知道我如何在每次循环迭代后清理我的随机播放块,或者为什么我的随机播放块没有被清理?
一旦您将 RDD 缓存到您的 memory/disk,只要 spark 上下文存在,RDD 就会存储在您的 memory/disk。
为了告诉驱动程序它可以从 memory/disk 中删除 RDD,您需要使用 unpersist()
函数。
From the
java-doc
:/** * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. * * @param blocking Whether to block until all blocks are deleted. * @return This RDD. */ def unpersist(blocking: Boolean = true)
所以你可以使用:
rdd.unpersist()
取决于您是否在这些 RDD 之间存在依赖关系。例如:
val rdd2 = rdd1.<transformation>
val rdd3 = rdd2.<transformation>
...
在这种情况下,spark 会记住沿袭,并且总会有对旧 RDD 的引用,这使得它不会被 spark driver 选择清理(spark rdd clean up is down by gc on spark driver回收一些不再引用的 rdd 引用)。
所以 persist() 在这种情况下不起作用,唯一的方法是使用 localCheckpoint()
。这是我以前做过的并且为我工作的:
rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
.localCheckpoint()
// do sth here and later
rdd.unpersist()
这使得 spark 正确截断沿袭,然后您可以安全地 unpersist() 它而不用担心未清理的引用。
请参考 spark doc 了解如何正确截断执行计划沿袭:https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-checkpointing.html