减少任务 Spark

Number reduce tasks Spark

Spark计算reduce任务数的公式是什么?

我是 运行 几个 spark-sql 查询,reduce 任务的数量始终是 200。这些查询的 map 任务的数量是 154。我在 Spark 1.4 上。 1.

这是否与spark.shuffle.sort.bypassMergeThreshold有关,默认为200

您要找的是 spark.sql.shuffle.partitions。根据 Spark SQL performance tuning guide:

| Property Name                 | Default | Meaning                                        |
+-------------------------------+---------+------------------------------------------------+
| spark.sql.shuffle.partitions  | 200     | Configures the number of partitions to use     |
|                               |         | when shuffling data for joins or aggregations. |

另一个相关的选项是 spark.default.parallelism,它决定了 'default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user',但是这似乎被 Spark SQL 忽略了,并且只在处理普通 RDD 时才相关。

是的,@svgd,这是正确的参数。以下是您如何在 Scala 中重置它:

// Set number of shuffle partitions to 3
sqlContext.setConf("spark.sql.shuffle.partitions", "3")
// Verify the setting 
sqlContext.getConf("spark.sql.shuffle.partitions")

如今在 Spark 2 + 中设置此参数执行以下操作

spark.conf.set("spark.sql.shuffle.partitions", 16)

通过 mapreduce.input.fileinputformat.split 指定最小和最大拆分大小应该会有所帮助。这些参数决定了将输入文件分割成的最小和最大块大小。

val spark = SparkSession.builder
    .config("mapreduce.input.fileinputformat.split.minsize", "1073741824")
    .config("mapreduce.input.fileinputformat.split.maxsize", "1073741824")           
    .enableHiveSupport().getOrCreate()

这里,分割大小一直保持1GB(1073741824字节)。 要记住 parquet、snappy 是可拆分的,而 gzip、lzo 不是。参考更多 here.