Spark:增量收集()到分区导致堆中的OutOfMemory
Spark: Incremental collect() to a partition causes OutOfMemory in Heap
我有以下代码。本质上,我需要将 RDD 打印到控制台,因此我通过按分区收集大 RDD,将其分成较小的块。这是为了避免一次收集整个 RDD。在监视堆和 GC 日志时,似乎什么都没有被 GC。堆不断增长,直到遇到 OutOfMemory 错误。如果我的理解是正确的,在下面,一旦对收集的 RDD 执行 println 语句,就不需要它们,因此对 GC 来说是安全的,但这不是我在 GC 日志中看到的,每次调用 collect 都会累积到 OOM。有谁知道为什么收集的数据没有被 GC 处理?
val writes = partitions.foreach { partition =>
val rddPartition = rdds.mapPartitionsWithIndex ({
case (index, data) => if (index == partition.index) data else Iterator[Words]()
}, false).collect().toSeq
val partialReport = Report(rddPartition, reportId, dateCreated)
println(partialReport.name)
}
collect()
创建一个包含 RDD 的所有元素的数组。
在完全创建之前不能进行垃圾回收!因此 OOM。
可以根据 Report
实际执行的操作来解决它。
如果您的数据集很大,主节点很可能无法处理它而关闭。您可以尝试将它们写入文件(例如 saveAsTextFile),然后再次读取每个文件
我有以下代码。本质上,我需要将 RDD 打印到控制台,因此我通过按分区收集大 RDD,将其分成较小的块。这是为了避免一次收集整个 RDD。在监视堆和 GC 日志时,似乎什么都没有被 GC。堆不断增长,直到遇到 OutOfMemory 错误。如果我的理解是正确的,在下面,一旦对收集的 RDD 执行 println 语句,就不需要它们,因此对 GC 来说是安全的,但这不是我在 GC 日志中看到的,每次调用 collect 都会累积到 OOM。有谁知道为什么收集的数据没有被 GC 处理?
val writes = partitions.foreach { partition =>
val rddPartition = rdds.mapPartitionsWithIndex ({
case (index, data) => if (index == partition.index) data else Iterator[Words]()
}, false).collect().toSeq
val partialReport = Report(rddPartition, reportId, dateCreated)
println(partialReport.name)
}
collect()
创建一个包含 RDD 的所有元素的数组。
在完全创建之前不能进行垃圾回收!因此 OOM。
可以根据 Report
实际执行的操作来解决它。
如果您的数据集很大,主节点很可能无法处理它而关闭。您可以尝试将它们写入文件(例如 saveAsTextFile),然后再次读取每个文件