Spark 清理溢出到磁盘的随机播放

Spark clean up shuffle spilled to disk

我有一个循环操作,它生成一些 RDD,进行重新分区,然后进行 aggregatebykey 操作。循环运行s次后,计算出一个最终的RDD,缓存并checkpoint,也作为下一次循环的初始RDD。

这些 RDD 非常大,在每次迭代到达最终 RDD 之前会生成大量中间随机块。我正在压缩我的 shuffle 并允许 shuffle 溢出到磁盘。

我注意到我的工作机器上存储随机文件的工作目录没有被清理。因此最终我 运行 出磁盘 space。我的印象是,如果我检查我的 RDD,它会删除所有中间洗牌块。然而,这似乎没有发生。有没有人知道我如何在每次循环迭代后清理我的随机播放块,或者为什么我的随机播放块没有被清理?

一旦您将 RDD 缓存到您的 memory/disk,只要 spark 上下文存在,RDD 就会存储在您的 memory/disk。

为了告诉驱动程序它可以从 memory/disk 中删除 RDD,您需要使用 unpersist() 函数。

From the java-doc:

 /**
   * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
   *
   * @param blocking Whether to block until all blocks are deleted.
   * @return This RDD.
   */
  def unpersist(blocking: Boolean = true)

所以你可以使用:

rdd.unpersist()

取决于您是否在这些 RDD 之间存在依赖关系。例如:

val rdd2 = rdd1.<transformation>
val rdd3 = rdd2.<transformation>
...

在这种情况下,spark 会记住沿袭,并且总会有对旧 RDD 的引用,这使得它不会被 spark driver 选择清理(spark rdd clean up is down by gc on spark driver回收一些不再引用的 rdd 引用)。
所以 persist() 在这种情况下不起作用,唯一的方法是使用 localCheckpoint()。这是我以前做过的并且为我工作的:

rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
          .localCheckpoint()
// do sth here and later
rdd.unpersist()

这使得 spark 正确截断沿袭,然后您可以安全地 unpersist() 它而不用担心未清理的引用。
请参考 spark doc 了解如何正确截断执行计划沿袭:https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-checkpointing.html