spark.sql.shuffle.partitions 的最佳值应该是多少,或者我们如何在使用 Spark SQL 时增加分区?

What should be the optimal value for spark.sql.shuffle.partitions or how do we increase partitions when using Spark SQL?

我正在使用 Spark SQL 实际上 hiveContext.sql() 它使用分组查询,我 运行 遇到了 OOM 问题。因此,考虑将 spark.sql.shuffle.partitions 的值从默认值 200 增加到 1000,但这无济于事。

我相信此分区将分担数据随机加载,因此分区越多,容纳的数据就越少。我是 Spark 的新手。我正在使用 Spark 1.4.0,我有大约 1TB 的未压缩数据要使用 hiveContext.sql() 按查询分组处理。

好的,我认为您的问题比较笼统。它不是 Spark SQL 特有的,这是 Spark 的一个普遍问题,当文件很少时,它会忽略您告诉它的分区数。 Spark 的分区数似乎与 HDFS 上的文件数相同,除非你调用 repartition。所以调用 repartition 应该可行,但需要注意的是会造成一些不必要的随机播放。

我刚才提过这个问题,现在还没有得到很好的回答:(

Spark: increase number of partitions without causing a shuffle?

如果您 运行 在随机播放时内存不足,请尝试将 spark.sql.shuffle.partitions 设置为 2001。

Spark uses a different data structure for shuffle book-keeping when the number of partitions is greater than 2000:

private[spark] object MapStatus {

  def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
    if (uncompressedSizes.length > 2000) {
      HighlyCompressedMapStatus(loc, uncompressedSizes)
    } else {
      new CompressedMapStatus(loc, uncompressedSizes)
    }
  }
...

我真希望他们能让你独立配置。

对了,我找到了this information in a Cloudera slide deck

这实际上取决于你的数据和你的查询,如果 Spark 必须加载 1Tb,那么你的设计有问题。

使用 superbe web UI 查看 DAG,这意味着 Spark 如何将您的 SQL 查询转换为 jobs/stages 和任务。

有用的指标是 "Input" 和 "Shuffle"。

  • 对数据进行分区(Hive / 目录布局如 /year=X/month=X)
  • 使用 spark CLUSTER BY 功能,按数据分区工作
  • 使用 ORC / Parquet 文件格式,因为它们提供 "Push-down filter",无用的数据不会加载到 Spark
  • 分析 Spark 历史以了解 Spark 如何读取数据

还有,你的驱动会不会出现OOM?

-> 这是另一个问题,驱动程序会在最后收集您想要的数据。如果你要求太多数据,驱动程序会 OOM,尝试限制你的查询,或者写另一个 table (Spark syntax CREATE TABLE ...AS).

我从 Cloudera 了解到 this post 有关 Hive 分区的信息。查看 "Pointers" 部分讨论分区数和每个分区中的文件数导致名称节点过载,这可能导致 OOM。