Spark:增量收集()到分区导致堆中的OutOfMemory

Spark: Incremental collect() to a partition causes OutOfMemory in Heap

我有以下代码。本质上,我需要将 RDD 打印到控制台,因此我通过按分区收集大 RDD,将其分成较小的块。这是为了避免一次收集整个 RDD。在监视堆和 GC 日志时,似乎什么都没有被 GC。堆不断增长,直到遇到 OutOfMemory 错误。如果我的理解是正确的,在下面,一旦对收集的 RDD 执行 println 语句,就不需要它们,因此对 GC 来说是安全的,但这不是我在 GC 日志中看到的,每次调用 collect 都会累积到 OOM。有谁知道为什么收集的数据没有被 GC 处理?

 val writes = partitions.foreach { partition =>
      val rddPartition = rdds.mapPartitionsWithIndex ({ 
        case (index, data) => if (index == partition.index) data else Iterator[Words]()
      }, false).collect().toSeq
      val partialReport = Report(rddPartition, reportId, dateCreated)
      println(partialReport.name) 
    }

collect() 创建一个包含 RDD 的所有元素的数组。

在完全创建之前不能进行垃圾回收!因此 OOM。

可以根据 Report 实际执行的操作来解决它。

如果您的数据集很大,主节点很可能无法处理它而关闭。您可以尝试将它们写入文件(例如 saveAsTextFile),然后再次读取每个文件