需要了解 Dataframe Spark 中的分区细节
Need to Know Partitioning Details in Dataframe Spark
我正在尝试根据查询从 DB2 数据库中读取数据。查询的结果集大约有 20 - 4000 万条记录。 DF的分区是基于整数列完成的。
我的问题是,加载数据后,如何检查每个分区创建了多少记录。基本上我想检查的是数据倾斜是否发生?如何检查每个分区的记录数?
例如,您可以映射分区并确定它们的大小:
val rdd = sc.parallelize(0 until 1000, 3)
val partitionSizes = rdd.mapPartitions(iter => Iterator(iter.length)).collect()
// would be Array(333, 333, 334) in this example
这适用于 RDD 和 Dataset/DataFrame API。
我们先创建一个DataFrame
。
rdd=sc.parallelize([('a',22),('b',1),('c',4),('b',1),('d',2),('e',0),('d',3),('a',1),('c',4),('b',7),('a',2),('f',1)] )
df=rdd.toDF(['key','value'])
df=df.repartition(5,"key") # Make 5 Partitions
分区数-
print("Number of partitions: {}".format(df.rdd.getNumPartitions()))
Number of partitions: 5
每个分区的行数。这可以让您了解偏斜 -
print('Partitioning distribution: '+ str(df.rdd.glom().map(len).collect()))
Partitioning distribution: [3, 3, 2, 2, 2]
查看行在分区上的实际分布情况。请注意,如果数据集很大,那么您的系统可能会因为 Out of Memory
问题而崩溃。
print("Partitions structure: {}".format(df.rdd.glom().collect()))
Partitions structure: [
#Partition 1 [Row(key='a', value=22), Row(key='a', value=1), Row(key='a', value=2)],
#Partition 2 [Row(key='b', value=1), Row(key='b', value=1), Row(key='b', value=7)],
#Partition 3 [Row(key='c', value=4), Row(key='c', value=4)],
#Partition 4 [Row(key='e', value=0), Row(key='f', value=1)],
#Partition 5 [Row(key='d', value=2), Row(key='d', value=3)]
]
我正在尝试根据查询从 DB2 数据库中读取数据。查询的结果集大约有 20 - 4000 万条记录。 DF的分区是基于整数列完成的。
我的问题是,加载数据后,如何检查每个分区创建了多少记录。基本上我想检查的是数据倾斜是否发生?如何检查每个分区的记录数?
例如,您可以映射分区并确定它们的大小:
val rdd = sc.parallelize(0 until 1000, 3)
val partitionSizes = rdd.mapPartitions(iter => Iterator(iter.length)).collect()
// would be Array(333, 333, 334) in this example
这适用于 RDD 和 Dataset/DataFrame API。
我们先创建一个DataFrame
。
rdd=sc.parallelize([('a',22),('b',1),('c',4),('b',1),('d',2),('e',0),('d',3),('a',1),('c',4),('b',7),('a',2),('f',1)] )
df=rdd.toDF(['key','value'])
df=df.repartition(5,"key") # Make 5 Partitions
分区数-
print("Number of partitions: {}".format(df.rdd.getNumPartitions()))
Number of partitions: 5
每个分区的行数。这可以让您了解偏斜 -
print('Partitioning distribution: '+ str(df.rdd.glom().map(len).collect()))
Partitioning distribution: [3, 3, 2, 2, 2]
查看行在分区上的实际分布情况。请注意,如果数据集很大,那么您的系统可能会因为 Out of Memory
问题而崩溃。
print("Partitions structure: {}".format(df.rdd.glom().collect()))
Partitions structure: [
#Partition 1 [Row(key='a', value=22), Row(key='a', value=1), Row(key='a', value=2)],
#Partition 2 [Row(key='b', value=1), Row(key='b', value=1), Row(key='b', value=7)],
#Partition 3 [Row(key='c', value=4), Row(key='c', value=4)],
#Partition 4 [Row(key='e', value=0), Row(key='f', value=1)],
#Partition 5 [Row(key='d', value=2), Row(key='d', value=3)]
]