Apache Spark 卡在 "loading" 状态
Apache Spark stuck in "loading" state
这是我 运行 来自 Spark shell 的工作:
val l = sc.parallelize((1 to 5000000).toList)
val m = l.map(m => m*23)
m.take(5000000)
工人似乎处于 "LOADING" 状态:
什么是 "LOADING" 状态?
更新:
据我所知,take
将在集群上执行作业,然后 return 将结果发送给驱动程序。所以 "LOADING" 状态等同于加载到驱动程序上的数据?
我相信如果你这样做,
(1 to 5000000 ).toList
你一定会遇到java.lang.OutOfMemoryError: GC overhead limit exceeded
.
当 JVM 意识到它在 Grabage Collection
中花费了太多时间时,就会发生这种情况。默认情况下,如果您在 GC
中花费 more than 98% of the total time
并且在恢复 heap
的 GC
less than 2%
之后,JVM 配置为抛出此错误。
在这种特殊情况下,您要为每次迭代创建 new instance
of List
,( immutability
,所以每次 new instance
of List
是回来 )。这意味着每次迭代都会留下一个无用的 List
实例,对于大小为数百万的 List
,它将占用大量内存并非常频繁地触发 GC
。此外,每次 GC
都必须释放大量内存,因此需要很多时间。
这最终会导致错误 - java.lang.OutOfMemoryError: GC overhead limit exceeded
。
如果不存在会发生什么? -> 这意味着 GC 能够清理的少量数据将很快再次被填满,从而迫使 GC 重新启动清理过程again.This 形成了一个恶性循环,其中 CPU 100% 忙于 GC,无法完成任何实际工作。应用程序将面临极度缓慢 - 过去在几毫秒内完成的操作现在可能需要几分钟才能完成。
这是在 JVM 中实现的先发制人的快速失败保护措施。
您可以使用以下 Java 选项禁用此保护措施。
-XX:-UseGCOverheadLimit
但我强烈建议不要这样做。
即使您禁用此功能(或者如果您的 Spark-Cluster 通过分配大堆 space 来避免此 to some extent
),
(1 to 5000000 ).toList
需要很长时间。
此外,我有一种强烈的感觉,像 Spark
这样应该是 运行 多个作业的系统被配置(默认情况下,你可以覆盖)到 pause
或 reject
这样的作业一旦实现极端 GC 就会导致其他作业饥饿。这可能是您的工作总是加载的主要原因。
您可以通过使用 mutable List
并使用 for 循环向其附加值来减轻很多负担。现在您可以并行化您的可变列表。
val mutableList = scala.collection.mutable.MutableList.empty[ Int ]
for ( i <- 1 to 5000000 ) {
mutableList.append( i )
}
val l = sc.parallelize( mutableList )
但即便如此,每当 List
为 half-full
时,也会导致多次(但很多时候不那么严重)内存分配(因此 GC 执行),从而导致整个 memory relocation
List
之前分配的内存的两倍。
这是我 运行 来自 Spark shell 的工作:
val l = sc.parallelize((1 to 5000000).toList)
val m = l.map(m => m*23)
m.take(5000000)
工人似乎处于 "LOADING" 状态:
什么是 "LOADING" 状态?
更新:
据我所知,take
将在集群上执行作业,然后 return 将结果发送给驱动程序。所以 "LOADING" 状态等同于加载到驱动程序上的数据?
我相信如果你这样做,
(1 to 5000000 ).toList
你一定会遇到java.lang.OutOfMemoryError: GC overhead limit exceeded
.
当 JVM 意识到它在 Grabage Collection
中花费了太多时间时,就会发生这种情况。默认情况下,如果您在 GC
中花费 more than 98% of the total time
并且在恢复 heap
的 GC
less than 2%
之后,JVM 配置为抛出此错误。
在这种特殊情况下,您要为每次迭代创建 new instance
of List
,( immutability
,所以每次 new instance
of List
是回来 )。这意味着每次迭代都会留下一个无用的 List
实例,对于大小为数百万的 List
,它将占用大量内存并非常频繁地触发 GC
。此外,每次 GC
都必须释放大量内存,因此需要很多时间。
这最终会导致错误 - java.lang.OutOfMemoryError: GC overhead limit exceeded
。
如果不存在会发生什么? -> 这意味着 GC 能够清理的少量数据将很快再次被填满,从而迫使 GC 重新启动清理过程again.This 形成了一个恶性循环,其中 CPU 100% 忙于 GC,无法完成任何实际工作。应用程序将面临极度缓慢 - 过去在几毫秒内完成的操作现在可能需要几分钟才能完成。
这是在 JVM 中实现的先发制人的快速失败保护措施。
您可以使用以下 Java 选项禁用此保护措施。
-XX:-UseGCOverheadLimit
但我强烈建议不要这样做。
即使您禁用此功能(或者如果您的 Spark-Cluster 通过分配大堆 space 来避免此 to some extent
),
(1 to 5000000 ).toList
需要很长时间。
此外,我有一种强烈的感觉,像 Spark
这样应该是 运行 多个作业的系统被配置(默认情况下,你可以覆盖)到 pause
或 reject
这样的作业一旦实现极端 GC 就会导致其他作业饥饿。这可能是您的工作总是加载的主要原因。
您可以通过使用 mutable List
并使用 for 循环向其附加值来减轻很多负担。现在您可以并行化您的可变列表。
val mutableList = scala.collection.mutable.MutableList.empty[ Int ]
for ( i <- 1 to 5000000 ) {
mutableList.append( i )
}
val l = sc.parallelize( mutableList )
但即便如此,每当 List
为 half-full
时,也会导致多次(但很多时候不那么严重)内存分配(因此 GC 执行),从而导致整个 memory relocation
List
之前分配的内存的两倍。