为什么在重新分区 Spark Dataframe 时会得到这么多空分区?
Why do I get so many empty partitions when repartionning a Spark Dataframe?
我想在 3 列上划分数据框 "df1"。对于这 3 列,此数据框恰好有 990 个独特的组合:
In [17]: df1.createOrReplaceTempView("df1_view")
In [18]: spark.sql("select count(*) from (select distinct(col1,col2,col3) from df1_view) as t").show()
+--------+
|count(1)|
+--------+
| 990|
+--------+
为了优化此数据帧的处理,我想对 df1 进行分区以获得 990 个分区,每个分区对应一个键可能性:
In [19]: df1.rdd.getNumPartitions()
Out[19]: 24
In [20]: df2 = df1.repartition(990, "col1", "col2", "col3")
In [21]: df2.rdd.getNumPartitions()
Out[21]: 990
我写了一个简单的方法来计算每个分区中的行数:
In [22]: def f(iterator):
...: a = 0
...: for partition in iterator:
...: a = a + 1
...: print(a)
...:
In [23]: df2.foreachPartition(f)
而且我注意到我得到的实际上是 628 个具有一个或多个键值的分区,以及 362 个空分区。
我假设 spark 会以均匀的方式重新分区(1 个键值 = 1 个分区),但事实并非如此,我觉得这种重新分区会增加数据倾斜,即使它应该是相反的。 ..
Spark 使用什么算法按列对数据框进行分区?
有没有办法实现我认为可能实现的目标?
我在 Cloudera 上使用 Spark 2.2.0。
我想在 3 列上划分数据框 "df1"。对于这 3 列,此数据框恰好有 990 个独特的组合:
In [17]: df1.createOrReplaceTempView("df1_view")
In [18]: spark.sql("select count(*) from (select distinct(col1,col2,col3) from df1_view) as t").show()
+--------+
|count(1)|
+--------+
| 990|
+--------+
为了优化此数据帧的处理,我想对 df1 进行分区以获得 990 个分区,每个分区对应一个键可能性:
In [19]: df1.rdd.getNumPartitions()
Out[19]: 24
In [20]: df2 = df1.repartition(990, "col1", "col2", "col3")
In [21]: df2.rdd.getNumPartitions()
Out[21]: 990
我写了一个简单的方法来计算每个分区中的行数:
In [22]: def f(iterator):
...: a = 0
...: for partition in iterator:
...: a = a + 1
...: print(a)
...:
In [23]: df2.foreachPartition(f)
而且我注意到我得到的实际上是 628 个具有一个或多个键值的分区,以及 362 个空分区。
我假设 spark 会以均匀的方式重新分区(1 个键值 = 1 个分区),但事实并非如此,我觉得这种重新分区会增加数据倾斜,即使它应该是相反的。 ..
Spark 使用什么算法按列对数据框进行分区? 有没有办法实现我认为可能实现的目标?
我在 Cloudera 上使用 Spark 2.2.0。