默认情况下,Spark Dataframe 是如何分区的?

How is a Spark Dataframe partitioned by default?

我知道使用 HashPartitioner 根据键值对 RDD 进行分区。但是默认情况下,Spark Dataframe 是如何分区的,因为它没有 key/value.

的概念

Dataframe 根据 运行 创建它的任务数量进行分区。

没有应用“默认”分区逻辑。以下是如何设置分区的一些示例:

  • 通过 val df = Seq(1 to 500000: _*).toDF() 创建的 Dataframe 将只有一个分区。
  • 通过 val df = spark.range(0,100).toDF() 创建的 Dataframe 具有与可用核心数量一样多的分区(例如,当您的 master 设置为 local[4] 时为 4 个)。另外,请参阅下面关于“默认并行度”的评论,该评论对没有父 RDD 的 parallelize 等操作生效。
  • 从 RDD (spark.createDataFrame(rdd, schema)) 派生的 Dataframe 将具有与底层 RDD 相同数量的分区。在我的例子中,因为我在本地有 6 个内核,所以 RDD 创建了 6 个分区。
  • 从 Kafka 主题消费的 Dataframe 将具有与主题分区匹配的分区数量,因为它可以使用与主题拥有的分区一样多的 cores/slots 来消费主题。
  • 通过读取文件创建的数据框,例如来自 HDFS 的分区数量将与文件匹配,除非必须根据 spark.sql.files.maxPartitionBytes 将单个文件拆分为多个分区,默认为 128MB。
  • 从需要洗牌的转换派生的 Dataframe 将具有由 spark.sql.shuffle.partitions 设置的可配置分区数量(默认为 200)。
  • ...

RDD 和 Structured API 之间的主要区别之一是您对分区的控制不如 RDD,您甚至可以定义自定义分区程序。这对于 Dataframes 是不可能的。

默认并行度

Execution Behavior 配置的文档 spark.default.parallelism 解释:

For operations like parallelize with no parent RDDs, it depends on the cluster manager:

Local mode: number of cores on the local machine

Mesos fine grained mode: 8

Others: total number of cores on all executor nodes or 2, whichever is larger