rdd.repartition() 和 sc.parallelize(data, partitions) 中的分区大小有什么区别
what is the difference between rdd.repartition() and partition size in sc.parallelize(data, partitions)
我正在浏览 spark 的文档。我对 rdd.repartition() 函数和我们在 sc.parallelize().
上下文初始化期间传递的分区数有点困惑
我的机器上有 4 个内核,如果我 sc.parallelize(data, 4) 一切正常,但是当我 rdd.repartition(4) 并应用 rdd.mappartitions(fun)有时分区没有数据,在这种情况下我的功能会失败。
所以,只是想了解这两种分区方式之间的区别。
通过调用 repartition(N)
,spark 将进行随机播放以更改分区数(默认情况下会生成具有该分区数的 HashPartitioner)。当您使用所需数量的分区调用 sc.parallelize
时,它会将您的数据(或多或少)平均分配给切片(实际上类似于范围分区器),您可以在 ParallelCollectionRDD
内部看到这一点slice
函数。
话虽如此,sc.parallelize(data, N)
和 rdd.reparitition(N)
(以及几乎任何形式的数据读取)都有可能导致 RDD 具有空分区(这是很常见的mapPartitions
代码的错误来源,所以我偏向于 spark-testing-base 中的 RDD 生成器来创建具有空分区的 RDD)。对于大多数函数来说,一个非常简单的修复方法就是检查你是否传入了一个空迭代器,并在这种情况下返回一个空迭代器。
我正在浏览 spark 的文档。我对 rdd.repartition() 函数和我们在 sc.parallelize().
上下文初始化期间传递的分区数有点困惑我的机器上有 4 个内核,如果我 sc.parallelize(data, 4) 一切正常,但是当我 rdd.repartition(4) 并应用 rdd.mappartitions(fun)有时分区没有数据,在这种情况下我的功能会失败。
所以,只是想了解这两种分区方式之间的区别。
通过调用 repartition(N)
,spark 将进行随机播放以更改分区数(默认情况下会生成具有该分区数的 HashPartitioner)。当您使用所需数量的分区调用 sc.parallelize
时,它会将您的数据(或多或少)平均分配给切片(实际上类似于范围分区器),您可以在 ParallelCollectionRDD
内部看到这一点slice
函数。
话虽如此,sc.parallelize(data, N)
和 rdd.reparitition(N)
(以及几乎任何形式的数据读取)都有可能导致 RDD 具有空分区(这是很常见的mapPartitions
代码的错误来源,所以我偏向于 spark-testing-base 中的 RDD 生成器来创建具有空分区的 RDD)。对于大多数函数来说,一个非常简单的修复方法就是检查你是否传入了一个空迭代器,并在这种情况下返回一个空迭代器。