减少任务 Spark
Number reduce tasks Spark
Spark计算reduce任务数的公式是什么?
我是 运行 几个 spark-sql 查询,reduce 任务的数量始终是 200。这些查询的 map 任务的数量是 154。我在 Spark 1.4 上。 1.
这是否与spark.shuffle.sort.bypassMergeThreshold有关,默认为200
您要找的是 spark.sql.shuffle.partitions
。根据 Spark SQL performance tuning guide:
| Property Name | Default | Meaning |
+-------------------------------+---------+------------------------------------------------+
| spark.sql.shuffle.partitions | 200 | Configures the number of partitions to use |
| | | when shuffling data for joins or aggregations. |
另一个相关的选项是 spark.default.parallelism
,它决定了 'default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user',但是这似乎被 Spark SQL 忽略了,并且只在处理普通 RDD 时才相关。
是的,@svgd,这是正确的参数。以下是您如何在 Scala 中重置它:
// Set number of shuffle partitions to 3
sqlContext.setConf("spark.sql.shuffle.partitions", "3")
// Verify the setting
sqlContext.getConf("spark.sql.shuffle.partitions")
如今在 Spark 2 + 中设置此参数执行以下操作
spark.conf.set("spark.sql.shuffle.partitions", 16)
通过 mapreduce.input.fileinputformat.split
指定最小和最大拆分大小应该会有所帮助。这些参数决定了将输入文件分割成的最小和最大块大小。
val spark = SparkSession.builder
.config("mapreduce.input.fileinputformat.split.minsize", "1073741824")
.config("mapreduce.input.fileinputformat.split.maxsize", "1073741824")
.enableHiveSupport().getOrCreate()
这里,分割大小一直保持1GB(1073741824字节)。
要记住 parquet、snappy 是可拆分的,而 gzip、lzo 不是。参考更多 here.
Spark计算reduce任务数的公式是什么?
我是 运行 几个 spark-sql 查询,reduce 任务的数量始终是 200。这些查询的 map 任务的数量是 154。我在 Spark 1.4 上。 1.
这是否与spark.shuffle.sort.bypassMergeThreshold有关,默认为200
您要找的是 spark.sql.shuffle.partitions
。根据 Spark SQL performance tuning guide:
| Property Name | Default | Meaning |
+-------------------------------+---------+------------------------------------------------+
| spark.sql.shuffle.partitions | 200 | Configures the number of partitions to use |
| | | when shuffling data for joins or aggregations. |
另一个相关的选项是 spark.default.parallelism
,它决定了 'default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user',但是这似乎被 Spark SQL 忽略了,并且只在处理普通 RDD 时才相关。
是的,@svgd,这是正确的参数。以下是您如何在 Scala 中重置它:
// Set number of shuffle partitions to 3
sqlContext.setConf("spark.sql.shuffle.partitions", "3")
// Verify the setting
sqlContext.getConf("spark.sql.shuffle.partitions")
如今在 Spark 2 + 中设置此参数执行以下操作
spark.conf.set("spark.sql.shuffle.partitions", 16)
通过 mapreduce.input.fileinputformat.split
指定最小和最大拆分大小应该会有所帮助。这些参数决定了将输入文件分割成的最小和最大块大小。
val spark = SparkSession.builder
.config("mapreduce.input.fileinputformat.split.minsize", "1073741824")
.config("mapreduce.input.fileinputformat.split.maxsize", "1073741824")
.enableHiveSupport().getOrCreate()
这里,分割大小一直保持1GB(1073741824字节)。 要记住 parquet、snappy 是可拆分的,而 gzip、lzo 不是。参考更多 here.