Apache Spark:获取每个分区的记录数
Apache Spark: Get number of records per partition
我想看看我们怎样才能得到关于每个分区的信息,比如总号。当 Spark 作业以部署模式作为 yarn 集群提交时,驱动程序端每个分区中的记录数,以便在控制台上记录或打印。
您可以像这样获取每个分区的记录数:
df
.rdd
.mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
.toDF("partition_number","number_of_records")
.show
但是这也会自己启动一个Spark Job(因为spark必须读取文件才能获取记录数)。
Spark 也可以读取配置单元 table 统计信息,但我不知道如何显示这些元数据..
我会使用内置函数。它应该尽可能高效:
import org.apache.spark.sql.functions.spark_partition_id
df.groupBy(spark_partition_id).count
Spark 1.5 解决方案:
(sparkPartitionId()
存在于 org.apache.spark.sql.functions
中)
import org.apache.spark.sql.functions._
df.withColumn("partitionId", sparkPartitionId()).groupBy("partitionId").count.show
如@Raphael Roth 所述
mapPartitionsWithIndex
是最好的方法,将适用于所有版本的 spark,因为它是基于 RDD 的方法
Spark/scala:
val numPartitions = 20000
val a = sc.parallelize(0 until 1e6.toInt, numPartitions )
val l = a.glom().map(_.length).collect() # get length of each partition
print(l.min, l.max, l.sum/l.length, l.length) # check if skewed
PySpark:
num_partitions = 20000
a = sc.parallelize(range(int(1e6)), num_partitions)
l = a.glom().map(len).collect() # get length of each partition
print(min(l), max(l), sum(l)/len(l), len(l)) # check if skewed
对于 dataframe
也是可能的,而不仅仅是对于 RDD
。
只需将 DF.rdd.glom
... 添加到上面的代码中即可。
致谢:Mike Dusenberry @https://issues.apache.org/jira/browse/SPARK-17817
对于未来的 PySpark 用户:
from pyspark.sql.functions import spark_partition_id
rawDf.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().show()
PySpark:
from pyspark.sql.functions import spark_partition_id
df.select(spark_partition_id().alias("partitionId")).groupBy("partitionId").count()
我想看看我们怎样才能得到关于每个分区的信息,比如总号。当 Spark 作业以部署模式作为 yarn 集群提交时,驱动程序端每个分区中的记录数,以便在控制台上记录或打印。
您可以像这样获取每个分区的记录数:
df
.rdd
.mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
.toDF("partition_number","number_of_records")
.show
但是这也会自己启动一个Spark Job(因为spark必须读取文件才能获取记录数)。
Spark 也可以读取配置单元 table 统计信息,但我不知道如何显示这些元数据..
我会使用内置函数。它应该尽可能高效:
import org.apache.spark.sql.functions.spark_partition_id
df.groupBy(spark_partition_id).count
Spark 1.5 解决方案:
(sparkPartitionId()
存在于 org.apache.spark.sql.functions
中)
import org.apache.spark.sql.functions._
df.withColumn("partitionId", sparkPartitionId()).groupBy("partitionId").count.show
如@Raphael Roth 所述
mapPartitionsWithIndex
是最好的方法,将适用于所有版本的 spark,因为它是基于 RDD 的方法
Spark/scala:
val numPartitions = 20000
val a = sc.parallelize(0 until 1e6.toInt, numPartitions )
val l = a.glom().map(_.length).collect() # get length of each partition
print(l.min, l.max, l.sum/l.length, l.length) # check if skewed
PySpark:
num_partitions = 20000
a = sc.parallelize(range(int(1e6)), num_partitions)
l = a.glom().map(len).collect() # get length of each partition
print(min(l), max(l), sum(l)/len(l), len(l)) # check if skewed
对于 dataframe
也是可能的,而不仅仅是对于 RDD
。
只需将 DF.rdd.glom
... 添加到上面的代码中即可。
致谢:Mike Dusenberry @https://issues.apache.org/jira/browse/SPARK-17817
对于未来的 PySpark 用户:
from pyspark.sql.functions import spark_partition_id
rawDf.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().show()
PySpark:
from pyspark.sql.functions import spark_partition_id
df.select(spark_partition_id().alias("partitionId")).groupBy("partitionId").count()