节点上的 Spark 分区 foreach 分区
Spark partition on nodes foreachpartition
我有一个带有 master 和 4 个 worker(2 个 preemtible)的 spark 集群 (DataProc),在我的代码中我有这样的东西:
JavaRDD<Signal> rdd_data = javaSparkContext.parallelize(myArray);
rdd_data.foreachPartition(partitionOfRecords -> {
while (partitionOfRecords.hasNext()) {
MyData d = partitionOfRecords.next();
LOG.info("my data: " + d.getId().toString());
}
})
myArray 由 1200 个 MyData 对象组成。
不明白为什么spark只用2核,把我的array分成2个partition,不用16核。
我需要设置分区数?
在此先感谢您的帮助。
通常,将分区数指定为 parallelize
的第二个参数总是一个好主意,因为数据集的最佳切片实际上应该独立于您正在使用的集群的特定形状, Spark 最多可以使用当前大小的执行程序作为 "hint".
您在这里看到的是 Spark 将默认要求 taskScheduler
启用 current number of executor cores to use as the defaultParallelism, combined with the fact that in Dataproc Spark dynamic allocation。动态分配很重要,因为否则提交到集群的单个作业可能只指定最大执行程序,即使它处于空闲状态,然后它将阻止其他作业能够使用这些空闲资源。
所以在 Dataproc 上,如果您使用默认的 n1-standard-4,Dataproc 会为每台机器配置 2 个执行程序,并为每个执行程序提供 2 个核心。 spark.dynamicAllocation.minExecutors
的值应该是 1
,因此您的默认作业在启动时没有做任何工作,将位于 1 个具有 2 个内核的执行程序上。然后 taskScheduler
会报告当前总共保留了 2 个核心,因此 defaultParallelism 将为 2。
如果您有一个大型集群并且您已经 运行 了一段时间的工作(例如,您有一个运行时间超过 60 秒的映射阶段),您会期望动态分配已占用所有可用资源,因此使用 defaultParallelism 的作业的下一步大概是 16,这是集群上的总内核(或者可能是 14,如果 2 个被 appmaster 使用)。
在实践中,您可能希望并行化到比可用内核总数更多的分区。然后,如果每个元素的处理时间有任何偏差,您可以在快速任务完成的地方进行很好的平衡,然后那些执行者可以开始使用新分区,而慢速任务仍然 运行,而不是总是必须等待一个最慢的分区完成。通常选择的分区数量从 2 倍的可用内核数量到 100 倍或更多。
这是另一个相关的 Whosebug 问题:spark.default.parallelism for Parallelize RDD defaults to 2 for spark submit
我有一个带有 master 和 4 个 worker(2 个 preemtible)的 spark 集群 (DataProc),在我的代码中我有这样的东西:
JavaRDD<Signal> rdd_data = javaSparkContext.parallelize(myArray);
rdd_data.foreachPartition(partitionOfRecords -> {
while (partitionOfRecords.hasNext()) {
MyData d = partitionOfRecords.next();
LOG.info("my data: " + d.getId().toString());
}
})
myArray 由 1200 个 MyData 对象组成。 不明白为什么spark只用2核,把我的array分成2个partition,不用16核。 我需要设置分区数?
在此先感谢您的帮助。
通常,将分区数指定为 parallelize
的第二个参数总是一个好主意,因为数据集的最佳切片实际上应该独立于您正在使用的集群的特定形状, Spark 最多可以使用当前大小的执行程序作为 "hint".
您在这里看到的是 Spark 将默认要求 taskScheduler
启用 current number of executor cores to use as the defaultParallelism, combined with the fact that in Dataproc Spark dynamic allocation。动态分配很重要,因为否则提交到集群的单个作业可能只指定最大执行程序,即使它处于空闲状态,然后它将阻止其他作业能够使用这些空闲资源。
所以在 Dataproc 上,如果您使用默认的 n1-standard-4,Dataproc 会为每台机器配置 2 个执行程序,并为每个执行程序提供 2 个核心。 spark.dynamicAllocation.minExecutors
的值应该是 1
,因此您的默认作业在启动时没有做任何工作,将位于 1 个具有 2 个内核的执行程序上。然后 taskScheduler
会报告当前总共保留了 2 个核心,因此 defaultParallelism 将为 2。
如果您有一个大型集群并且您已经 运行 了一段时间的工作(例如,您有一个运行时间超过 60 秒的映射阶段),您会期望动态分配已占用所有可用资源,因此使用 defaultParallelism 的作业的下一步大概是 16,这是集群上的总内核(或者可能是 14,如果 2 个被 appmaster 使用)。
在实践中,您可能希望并行化到比可用内核总数更多的分区。然后,如果每个元素的处理时间有任何偏差,您可以在快速任务完成的地方进行很好的平衡,然后那些执行者可以开始使用新分区,而慢速任务仍然 运行,而不是总是必须等待一个最慢的分区完成。通常选择的分区数量从 2 倍的可用内核数量到 100 倍或更多。
这是另一个相关的 Whosebug 问题:spark.default.parallelism for Parallelize RDD defaults to 2 for spark submit