如何在不产生 .rdd 成本的情况下检查 Spark DataFrame 的分区数

How to check the number of partitions of a Spark DataFrame without incurring the cost of .rdd

关于如何获得 n RDD 和或 DataFrame 的分区数有很多问题:答案总是:

 rdd.getNumPartitions

 df.rdd.getNumPartitions

不幸的是,这是对 DataFrame 昂贵 操作,因为

 df.rdd

需要从 DataFrame 转换为 rdd。这是按照 运行

所需时间的顺序
 df.count

我正在编写 可选 repartitioncoalesce 的逻辑 DataFrame - 基于 current 分区数在 acceptable 值范围内,或者低于或高于它们。

  def repartition(inDf: DataFrame, minPartitions: Option[Int],
       maxPartitions: Option[Int]): DataFrame = {
    val inputPartitions= inDf.rdd.getNumPartitions  // EXPENSIVE!
    val outDf = minPartitions.flatMap{ minp =>
      if (inputPartitions < minp) {
        info(s"Repartition the input from $inputPartitions to $minp partitions..")
        Option(inDf.repartition(minp))
      } else {
        None
      }
    }.getOrElse( maxPartitions.map{ maxp =>
      if (inputPartitions > maxp) {
        info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
        inDf.coalesce(maxp)
      } else inDf
    }.getOrElse(inDf))
    outDf
  }

但是我们负担不起以这种方式每个DataFramerdd.getNumPartitions的成本。

有没有办法获取此信息 - 例如从 online/temporary catalog 中查询 registered table 也许?

更新 Spark GUI 显示 DataFrame.rdd 操作与作业中最长的 sql 一样长。我将重新运行作业并在此处附上屏幕截图。

以下只是一个测试用例:它使用的是生产环境中数据大小的一小部分。最长的 sql 只有五分钟 - 而这个 也是 (请注意 sqlnot 在这里帮了大忙:它还必须随后执行,因此有效地将累积执行时间加倍。

我们可以看到 DataFrameUtils 第 30 行的 .rdd 操作(如上面的代码片段所示)需要 5.1 分钟 - 然而 save 操作 仍然 5.2 分钟后 - 即根据后续 save.

的执行时间,我们 not 通过执行 .rdd 来节省任何时间

rdd.getNumPartitions 中的 rdd 组件没有固有成本,因为从未评估返回的 RDD

虽然您可以根据经验轻松确定这一点,但可以使用调试器(我将把它留作 reader 的练习),或者确定在基本情况下不会触发任何作业

Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val ds = spark.read.text("README.md")
ds: org.apache.spark.sql.DataFrame = [value: string]

scala> ds.rdd.getNumPartitions
res0: Int = 1

scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
res1: Boolean = true

可能不足以说服您。因此,让我们以更系统的方式来解决这个问题:

  • rdd returns a MapPartitionRDDds 如上定义):

    scala> ds.rdd.getClass
    res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD
    
  • RDD.getNumPartitions invokes RDD.partitions.

  • 在non-checkpointed场景RDD.partitions invokes getPartitions中(也可以随意跟踪检查点路径)。
  • RDD.getPartitions is abstract.
  • 所以在这种情况下实际使用的实现是MapPartitionsRDD.getPartitions,也就是delegates the call to the parent.
  • rdd和来源之间只有MapPartitionsRDD

    scala> ds.rdd.toDebugString
    res3: String =
    (1) MapPartitionsRDD[3] at rdd at <console>:26 []
     |  MapPartitionsRDD[2] at rdd at <console>:26 []
     |  MapPartitionsRDD[1] at rdd at <console>:26 []
     |  FileScanRDD[0] at rdd at <console>:26 []
    

    类似地,如果 Dataset 包含交换,我们将跟随 parents 到最近的洗牌:

    scala> ds.orderBy("value").rdd.toDebugString
    res4: String =
    (67) MapPartitionsRDD[13] at rdd at <console>:26 []
     |   MapPartitionsRDD[12] at rdd at <console>:26 []
     |   MapPartitionsRDD[11] at rdd at <console>:26 []
     |   ShuffledRowRDD[10] at rdd at <console>:26 []
     +-(1) MapPartitionsRDD[9] at rdd at <console>:26 []
        |  MapPartitionsRDD[5] at rdd at <console>:26 []
        |  FileScanRDD[4] at rdd at <console>:26 []
    

    注意这个case特别有意思,因为我们实际触发了一个job:

    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
    res5: Boolean = false
    
    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
    res6: Array[Int] = Array(0)
    

    那是因为我们遇到过无法静态确定分区的情况(参见 and )。

    在这种情况下getNumPartitions也会触发一个作业:

    scala> ds.orderBy("value").rdd.getNumPartitions
    res7: Int = 67
    
    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)  // Note new job id
    res8: Array[Int] = Array(1, 0)
    

    但这并不意味着观察到的成本与 .rdd 调用有某种关联。相反,如果没有静态公式(例如某些 Hadoop 输入格式,需要对数据进行全面扫描),找到 partitions 的内在成本。

请注意,此处提出的观点不应外推到 Dataset.rdd 的其他应用。例如 ds.rdd.count 确实是昂贵和浪费的。

根据我的经验,df.rdd.getNumPartitions 非常快,我从未遇到过超过一秒左右的情况。

或者,您也可以尝试

val numPartitions: Long = df
      .select(org.apache.spark.sql.functions.spark_partition_id()).distinct().count()

这将避免使用 .rdd