计算 Spark 数据帧的大小 - SizeEstimator 给出了意想不到的结果

Compute size of Spark dataframe - SizeEstimator gives unexpected results

我正在尝试找到一种可靠的方法来以编程方式计算 Spark 数据帧的大小(以字节为单位)。

原因是我想要一种方法来计算 "optimal" 个分区数("optimal" 在这里可能意味着不同的事情:它可能意味着 having an optimal partition size, or 写入时镶木地板表 - 但两者都可以假设为数据帧大小的某种线性函数)。换句话说,我想在数据帧上调用 coalesce(n)repartition(n),其中 n 不是固定数字,而是数据帧大小的函数。

SO 上的其他主题建议使用 org.apache.spark.util 中的 SizeEstimator.estimate 来获取数据帧的字节大小,但我得到的结果不一致。

首先,我将我的数据帧保存到内存中:

df.cache().count 

Spark UI 在“存储”选项卡中显示大小为 4.8GB。然后,我 运行 以下命令从 SizeEstimator:

获取大小
import org.apache.spark.util.SizeEstimator
SizeEstimator.estimate(df)

这给出了 115'715'808 字节的结果 =~ 116MB。但是,将 SizeEstimator 应用于不同的对象会导致截然不同的结果。例如,我尝试分别计算数据框中每一行的大小并将它们相加:

df.map(row => SizeEstimator.estimate(row.asInstanceOf[ AnyRef ])).reduce(_+_)

这导致大小为 12'084'698'256 字节 =~ 12GB。或者,我可以尝试将 SizeEstimator 应用于每个分区:

df.mapPartitions(
    iterator => Seq(SizeEstimator.estimate(
        iterator.toList.map(row => row.asInstanceOf[ AnyRef ]))).toIterator
).reduce(_+_)

这再次导致 10'792'965'376 字节的不同大小 =~ 10.8GB。

我知道涉及内存优化/内存开销,但在执行这些测试后,我看不出如何使用 SizeEstimator 来充分估计数据帧的大小(以及因此分区大小,或生成的 Parquet 文件大小)。

应用 SizeEstimator 的适当方法(如果有)是什么,以便获得对数据帧大小或其分区的良好估计?如果没有,这里建议的方法是什么?

SizeEstimator returns 对象在 JVM 堆上占用的字节数。这包括对象引用的对象,实际对象大小几乎总是小得多。

您观察到的大小差异是因为当您在 JVM 上创建新对象时,引用也会占用内存,并且会计算在内。

在此处查看文档
https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.util.SizeEstimator$

不幸的是,我无法从 SizeEstimator 中获得可靠的估计,但我可以找到另一种策略 - 如果数据帧被缓存,我们可以从 queryExecution 中提取其大小,如下所示:

df.cache.foreach(_ => ())
val catalyst_plan = df.queryExecution.logical
val df_size_in_bytes = spark.sessionState.executePlan(
    catalyst_plan).optimizedPlan.stats.sizeInBytes

对于示例数据帧,这恰好给出了 4.8GB(这也对应于写入未压缩的 Parquet table 时的文件大小)。

缺点是需要缓存数据帧,但在我的情况下这不是问题。

编辑:将 df.cache.foreach(_=>_) 替换为 df.cache.foreach(_ => ()),感谢@DavidBenedeki 在评论中指出。

除了您已经尝试过的尺寸估算器(很好的洞察力)..

下面是另一种选择

RDDInfo[] getRDDStorageInfo()

Return 关于哪些 RDD 被 缓存的信息 ,如果它们在 mem 或两者上,有多少 space 他们拿等等

实际上 spark 存储选项卡使用这个。Spark docs

下面是implementation from spark

 /**
   * :: DeveloperApi ::
   * Return information about what RDDs are cached, if they are in mem or on disk, how much space
   * they take, etc.
   */
  @DeveloperApi
  def getRDDStorageInfo: Array[RDDInfo] = {
    getRDDStorageInfo(_ => true)
  }

  private[spark] def getRDDStorageInfo(filter: RDD[_] => Boolean): Array[RDDInfo] = {
    assertNotStopped()
    val rddInfos = persistentRdds.values.filter(filter).map(RDDInfo.fromRdd).toArray
    rddInfos.foreach { rddInfo =>
      val rddId = rddInfo.id
      val rddStorageInfo = statusStore.asOption(statusStore.rdd(rddId))
      rddInfo.numCachedPartitions = rddStorageInfo.map(_.numCachedPartitions).getOrElse(0)
      rddInfo.memSize = rddStorageInfo.map(_.memoryUsed).getOrElse(0L)
      rddInfo.diskSize = rddStorageInfo.map(_.diskUsed).getOrElse(0L)
    }
    rddInfos.filter(_.isCached)
  }

yourRDD.toDebugString 来自 RDD 也使用这个。代码 here


一般注意事项:

在我看来,为了在每个分区中获得最佳记录数并检查您的重新分区是否正确并且它们分布均匀,我建议尝试如下...并调整您的重新分区数。然后测量分区的大小......会更明智。解决这种

yourdf.rdd.mapPartitionsWithIndex{case (index,rows) => Iterator((index,rows.size))}
  .toDF("PartitionNumber","NumberOfRecordsPerPartition")
  .show

或现有的 spark 函数(基于 spark 版本)

import org.apache.spark.sql.functions._ 

df.withColumn("partitionId", sparkPartitionId()).groupBy("partitionId").count.show

我的建议是

from sys import getsizeof

def compare_size_two_object(one, two):
    '''compare size of two files in bites'''
    print(getsizeof(one), 'versus', getsizeof(two))