why I got the error: "Size exceed Integer.MAX_VALUE" when using spark+cassandra?
why I got the error: "Size exceed Integer.MAX_VALUE" when using spark+cassandra?
我有 7 个 cassandra 节点 (5 nodes with 32 cores and 32G memory, and 4 nodes with 4 cores and 64G memory
),并在这个集群上部署了 spark worker,spark 的 master 在 8th node
中。我为他们使用了 spark-cassandra-connector。现在我的 cassandra 有近 10 亿条记录,有 30 个字段,我编写的 scala 包括以下代码片段:
def startOneCache(): DataFrame = {
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "192.168.0.184")
.set("spark.cassandra.auth.username", "username")
.set("spark.cassandra.auth.password", "password")
.set("spark.driver.maxResultSize", "4G")
.set("spark.executor.memory", "12G")
.set("spark.cassandra.input.split.size_in_mb","64")
val sc = new SparkContext("spark://192.168.0.131:7077", "statistics", conf)
val cc = new CassandraSQLContext(sc)
val rdd: DataFrame = cc.sql("select user_id,col1,col2,col3,col4,col5,col6
,col7,col8 from user_center.users").limit(100000192)
val rdd_cache: DataFrame = rdd.cache()
rdd_cache.count()
return rdd_cache
}
在spark的master中我使用spark-submit
到运行上面的代码,当执行语句:rdd_cache.count()
时,我在一个工作节点中得到了一个ERROR
: 192.168.0.185
:
16/03/08 15:38:57 INFO ShuffleBlockFetcherIterator: Started 4 remote fetches in 221 ms
16/03/08 15:43:49 WARN MemoryStore: Not enough space to cache rdd_6_0 in memory! (computed 4.6 GB so far)
16/03/08 15:43:49 INFO MemoryStore: Memory use = 61.9 KB (blocks) + 4.6 GB (scratch space shared across 1 tasks(s)) = 4.6 GB. Storage limit = 6.2 GB.
16/03/08 15:43:49 WARN CacheManager: Persisting partition rdd_6_0 to disk instead.
16/03/08 16:13:11 ERROR Executor: Managed memory leak detected; size = 4194304 bytes, TID = 24002
16/03/08 16:13:11 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 24002)
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
我简单的认为最后的错误Size exceeds Integer.MAX_VALUE
是前面的warn:16/03/08 15:43:49 WARN MemoryStore: Not enough space to cache rdd_6_0 in memory! (computed 4.6 GB so far)
引起的,但不知道为什么,或者是否应该设置大于.set("spark.executor.memory", "12G")
,我应该怎么做才能更正?
No Spark shuffle block can be greater than 2 GB.
Spark uses ByteBuffer
as abstraction for storing blocks and its size is limited by Integer.MAX_VALUE (2 billions).
分区数量少会导致随机数据块大小过高。要解决此问题,请尝试使用 rdd.repartition()
or rdd.coalesce()
或。
增加分区数
如果这没有帮助,这意味着至少有一个分区仍然太大,您可能需要使用一些更复杂的方法来缩小它——例如使用随机性来均衡 RDD 数据的分布在各个分区之间。
我有 7 个 cassandra 节点 (5 nodes with 32 cores and 32G memory, and 4 nodes with 4 cores and 64G memory
),并在这个集群上部署了 spark worker,spark 的 master 在 8th node
中。我为他们使用了 spark-cassandra-connector。现在我的 cassandra 有近 10 亿条记录,有 30 个字段,我编写的 scala 包括以下代码片段:
def startOneCache(): DataFrame = {
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "192.168.0.184")
.set("spark.cassandra.auth.username", "username")
.set("spark.cassandra.auth.password", "password")
.set("spark.driver.maxResultSize", "4G")
.set("spark.executor.memory", "12G")
.set("spark.cassandra.input.split.size_in_mb","64")
val sc = new SparkContext("spark://192.168.0.131:7077", "statistics", conf)
val cc = new CassandraSQLContext(sc)
val rdd: DataFrame = cc.sql("select user_id,col1,col2,col3,col4,col5,col6
,col7,col8 from user_center.users").limit(100000192)
val rdd_cache: DataFrame = rdd.cache()
rdd_cache.count()
return rdd_cache
}
在spark的master中我使用spark-submit
到运行上面的代码,当执行语句:rdd_cache.count()
时,我在一个工作节点中得到了一个ERROR
: 192.168.0.185
:
16/03/08 15:38:57 INFO ShuffleBlockFetcherIterator: Started 4 remote fetches in 221 ms
16/03/08 15:43:49 WARN MemoryStore: Not enough space to cache rdd_6_0 in memory! (computed 4.6 GB so far)
16/03/08 15:43:49 INFO MemoryStore: Memory use = 61.9 KB (blocks) + 4.6 GB (scratch space shared across 1 tasks(s)) = 4.6 GB. Storage limit = 6.2 GB.
16/03/08 15:43:49 WARN CacheManager: Persisting partition rdd_6_0 to disk instead.
16/03/08 16:13:11 ERROR Executor: Managed memory leak detected; size = 4194304 bytes, TID = 24002
16/03/08 16:13:11 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 24002)
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
我简单的认为最后的错误Size exceeds Integer.MAX_VALUE
是前面的warn:16/03/08 15:43:49 WARN MemoryStore: Not enough space to cache rdd_6_0 in memory! (computed 4.6 GB so far)
引起的,但不知道为什么,或者是否应该设置大于.set("spark.executor.memory", "12G")
,我应该怎么做才能更正?
No Spark shuffle block can be greater than 2 GB.
Spark uses
ByteBuffer
as abstraction for storing blocks and its size is limited by Integer.MAX_VALUE (2 billions).
分区数量少会导致随机数据块大小过高。要解决此问题,请尝试使用 rdd.repartition()
or rdd.coalesce()
或。
如果这没有帮助,这意味着至少有一个分区仍然太大,您可能需要使用一些更复杂的方法来缩小它——例如使用随机性来均衡 RDD 数据的分布在各个分区之间。