使用 Scala 和 Python API 联合 Spark 数据帧时的不同分区号
Different partition number when union Spark dataframes with Scala and Python API
我正在检查 2 个 相同 Spark 数据帧的并集的分区数,我注意到 Scala 和 Pyhton 的结果不一样 API.
Python 联合的分区数是 2 个数据帧的分区数之和,这是预期的行为。
Python
from pyspark.sql.types import IntegerType
df1 = spark.createDataFrame(range(100000), IntegerType()).repartition(10)
print("df1 partitions: %d" %df1.rdd.getNumPartitions())
df2 = spark.createDataFrame(range(100000), IntegerType()).repartition(10)
print("df2 partitions: %d" %df2.rdd.getNumPartitions())
df3 = df1.union(df2)
print("df3 partitions: %d" %df3.rdd.getNumPartitions())
结果:
df1 partitions: 10
df2 partitions: 10
df3 partitions: 20
但是,对于 scala,联合的分区数不会改变。
Scala
val df1 = (1 to 100000).toDF.repartition(10)
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")
val df2 = (1 to 100000 by 1).toDF.repartition(10)
println(s"df2 partitions: ${df2.rdd.getNumPartitions}")
val df3 = df1.union(df2)
println(s"df3 partitions: ${df3.rdd.getNumPartitions}")
结果:
df1 partitions: 10
df2 partitions: 10
df3 partitions: 10
只有当 2 个数据帧以相同的方式完全构建时才会出现这种情况。
什么时候不是:
val df1 = (1 to 100000).toDF.repartition(10)
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")
val df2 = (1 to 100000 by 2).toDF.repartition(10)
println(s"df2 partitions: ${df2.rdd.getNumPartitions}")
val df3 = df1.union(df2)
println(s"df3 partitions: ${df3.rdd.getNumPartitions}")
我得到了预期的结果(总和):
df1 partitions: 10
df2 partitions: 10
df3 partitions: 20
我的理解是,使用 Scala API Spark 在某些情况下能够优化联合。这是真实的 ?这意味着联合的执行计划在 Scala 和 Python API ?
之间可能不同
我问这个问题是因为我注意到 Scala 的联合比 Python 更高效,尤其是在多个联合的情况下。
spark 中 union 的定义 - scala
def union(other: Dataset[T]): Dataset[T] = withSetOperator {
// This breaks caching, but it's usually ok because it addresses a very specific use case:
// using union to union many files or partitions.
CombineUnions(Union(logicalPlan, other.logicalPlan))
}
pyspark 中联合的定义
def union(self, other):
# Return a new :class:`DataFrame` containing union of rows in this and #another
#:class:`DataFrame`.
#This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union
#(that does deduplication of elements), use this function followed by #:func:`distinct`.
#Also as standard in SQL, this function resolves columns by position (not #by name).
return DataFrame(self._jdf.union(other._jdf), self.sql_ctx)
https://github.com/apache/spark/blob/master/python/pyspark/sql/dataframe.py
有什么发现会继续更新
Observation1 -- scala 和 python
物理计划有差异
union physical plan pyspark
:- Exchange RoundRobinPartitioning(10), [id=#1318]
: +- *(1) Scan ExistingRDD[value#148]
+- Exchange RoundRobinPartitioning(10), [id=#1320]
+- *(2) Scan ExistingRDD[value#154]
== Physical Plan scala ==
Union
:- Exchange RoundRobinPartitioning(10), [id=#1012]
: +- LocalTableScan [value#122]
+- ReusedExchange [value#131], Exchange RoundRobinPartitioning(10), [id=#1012]
scala Range (1 to 10 by 2) == Physical Plan ==
val df2 = (1 to 10 by 2).toDF.repartition(10)
Union
:- Exchange RoundRobinPartitioning(10), [id=#1644]
: +- LocalTableScan [value#184]
+- Exchange RoundRobinPartitioning(10), [id=#1646]
+- LocalTableScan [value#193]
观察 2 -- spark 中的联合不会引发洗牌操作,它非常有效 operation.I 相信它是 df1 和 df2 的显式重新分区导致联合 df3 的分区数发生变化。如果您没有显式地对您的输入 Dataframes
进行分区,您最终会得到一个联合 df,其分区号等于 df1 和 df2 的总和。我尝试对相同数据进行排列,并在
下得到结果
案例 1
from pyspark.sql.types import IntegerType
df1 = spark.createDataFrame(range(100000), IntegerType())
print("df1 partitions: %d" %df1.rdd.getNumPartitions())
print("df1 partitioner: %s" %df1.rdd.partitioner)
df2 = spark.createDataFrame(range(100000), IntegerType())
print("df2 partitions: %d" %df2.rdd.getNumPartitions())
print("df2 partitioner: %s" %df2.rdd.partitioner)
df3 = df1.union(df2)
print("df3 partitions: %d" %df3.rdd.getNumPartitions())
print("df3 partitioner: %s" %df3.rdd.partitioner)
******O/P*********
df1 partitions: 8
df1 partitioner: None
df2 partitions: 8
df2 partitioner: None
df3 partitions: 16
df3 partitioner: None
案例 2
val df1 = (1 to 100000).toDF
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")
println(s"df1 partitioner: ${df1.rdd.partitioner}")
val df2 = (1 to 100000).toDF
println(s"df2 partitions: ${df2.rdd.getNumPartitions}")
println(s"df2 partitioner: ${df2.rdd.partitioner}")
df1.union(df2).explain()
val df3 = df1.union(df2)
println(s"df3 partitions: ${df3.rdd.getNumPartitions}")
println(s"df3 partitioner: ${df3.rdd.partitioner}")
******O/P*********
df1 partitions: 8
df1 partitioner: None
df2 partitions: 8
df2 partitioner: None
df3 partitions: 16
df3 partitioner: None
案例 3
val df1 = (1 to 100000).toDF
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")
println(s"df1 partitioner: ${df1.rdd.partitioner}")
val df2 = (1 to 100000 by 2).toDF
println(s"df2 partitions: ${df2.rdd.getNumPartitions}")
println(s"df2 partitioner: ${df2.rdd.partitioner}")
val df3 = df1.union(df2)
println(s"df3 partitions: ${df3.rdd.getNumPartitions}")
println(s"df3 partitioner: ${df3.rdd.partitioner}")
****O/P****
df1 partitions: 8
df1 partitioner: None
df2 partitions: 8
df2 partitioner: None
df3 partitions: 16
df3 partitioner: None
线索来自 Scala 引擎的解释:
Union
:- Exchange RoundRobinPartitioning(10), [id=#757]
: +- LocalTableScan [value#154]
+- ReusedExchange [value#159], Exchange RoundRobinPartitioning(10), [id=#757]
ReusedExchange 是一种优化形式。 Catalyst 认为它们是相同的。
如果你有一个有 10000 个条目,一个有 10001 个条目,那么得到 20 个分区。 Spark 有点聪明。
我正在检查 2 个 相同 Spark 数据帧的并集的分区数,我注意到 Scala 和 Pyhton 的结果不一样 API.
Python 联合的分区数是 2 个数据帧的分区数之和,这是预期的行为。
Python
from pyspark.sql.types import IntegerType
df1 = spark.createDataFrame(range(100000), IntegerType()).repartition(10)
print("df1 partitions: %d" %df1.rdd.getNumPartitions())
df2 = spark.createDataFrame(range(100000), IntegerType()).repartition(10)
print("df2 partitions: %d" %df2.rdd.getNumPartitions())
df3 = df1.union(df2)
print("df3 partitions: %d" %df3.rdd.getNumPartitions())
结果:
df1 partitions: 10
df2 partitions: 10
df3 partitions: 20
但是,对于 scala,联合的分区数不会改变。
Scala
val df1 = (1 to 100000).toDF.repartition(10)
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")
val df2 = (1 to 100000 by 1).toDF.repartition(10)
println(s"df2 partitions: ${df2.rdd.getNumPartitions}")
val df3 = df1.union(df2)
println(s"df3 partitions: ${df3.rdd.getNumPartitions}")
结果:
df1 partitions: 10
df2 partitions: 10
df3 partitions: 10
只有当 2 个数据帧以相同的方式完全构建时才会出现这种情况。
什么时候不是:
val df1 = (1 to 100000).toDF.repartition(10)
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")
val df2 = (1 to 100000 by 2).toDF.repartition(10)
println(s"df2 partitions: ${df2.rdd.getNumPartitions}")
val df3 = df1.union(df2)
println(s"df3 partitions: ${df3.rdd.getNumPartitions}")
我得到了预期的结果(总和):
df1 partitions: 10
df2 partitions: 10
df3 partitions: 20
我的理解是,使用 Scala API Spark 在某些情况下能够优化联合。这是真实的 ?这意味着联合的执行计划在 Scala 和 Python API ?
之间可能不同我问这个问题是因为我注意到 Scala 的联合比 Python 更高效,尤其是在多个联合的情况下。
spark 中 union 的定义 - scala
def union(other: Dataset[T]): Dataset[T] = withSetOperator {
// This breaks caching, but it's usually ok because it addresses a very specific use case:
// using union to union many files or partitions.
CombineUnions(Union(logicalPlan, other.logicalPlan))
}
pyspark 中联合的定义
def union(self, other):
# Return a new :class:`DataFrame` containing union of rows in this and #another
#:class:`DataFrame`.
#This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union
#(that does deduplication of elements), use this function followed by #:func:`distinct`.
#Also as standard in SQL, this function resolves columns by position (not #by name).
return DataFrame(self._jdf.union(other._jdf), self.sql_ctx)
https://github.com/apache/spark/blob/master/python/pyspark/sql/dataframe.py
有什么发现会继续更新
Observation1 -- scala 和 python
物理计划有差异union physical plan pyspark
:- Exchange RoundRobinPartitioning(10), [id=#1318]
: +- *(1) Scan ExistingRDD[value#148]
+- Exchange RoundRobinPartitioning(10), [id=#1320]
+- *(2) Scan ExistingRDD[value#154]
== Physical Plan scala ==
Union
:- Exchange RoundRobinPartitioning(10), [id=#1012]
: +- LocalTableScan [value#122]
+- ReusedExchange [value#131], Exchange RoundRobinPartitioning(10), [id=#1012]
scala Range (1 to 10 by 2) == Physical Plan ==
val df2 = (1 to 10 by 2).toDF.repartition(10)
Union
:- Exchange RoundRobinPartitioning(10), [id=#1644]
: +- LocalTableScan [value#184]
+- Exchange RoundRobinPartitioning(10), [id=#1646]
+- LocalTableScan [value#193]
观察 2 -- spark 中的联合不会引发洗牌操作,它非常有效 operation.I 相信它是 df1 和 df2 的显式重新分区导致联合 df3 的分区数发生变化。如果您没有显式地对您的输入 Dataframes
进行分区,您最终会得到一个联合 df,其分区号等于 df1 和 df2 的总和。我尝试对相同数据进行排列,并在
案例 1
from pyspark.sql.types import IntegerType
df1 = spark.createDataFrame(range(100000), IntegerType())
print("df1 partitions: %d" %df1.rdd.getNumPartitions())
print("df1 partitioner: %s" %df1.rdd.partitioner)
df2 = spark.createDataFrame(range(100000), IntegerType())
print("df2 partitions: %d" %df2.rdd.getNumPartitions())
print("df2 partitioner: %s" %df2.rdd.partitioner)
df3 = df1.union(df2)
print("df3 partitions: %d" %df3.rdd.getNumPartitions())
print("df3 partitioner: %s" %df3.rdd.partitioner)
******O/P*********
df1 partitions: 8
df1 partitioner: None
df2 partitions: 8
df2 partitioner: None
df3 partitions: 16
df3 partitioner: None
案例 2
val df1 = (1 to 100000).toDF
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")
println(s"df1 partitioner: ${df1.rdd.partitioner}")
val df2 = (1 to 100000).toDF
println(s"df2 partitions: ${df2.rdd.getNumPartitions}")
println(s"df2 partitioner: ${df2.rdd.partitioner}")
df1.union(df2).explain()
val df3 = df1.union(df2)
println(s"df3 partitions: ${df3.rdd.getNumPartitions}")
println(s"df3 partitioner: ${df3.rdd.partitioner}")
******O/P*********
df1 partitions: 8
df1 partitioner: None
df2 partitions: 8
df2 partitioner: None
df3 partitions: 16
df3 partitioner: None
案例 3
val df1 = (1 to 100000).toDF
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")
println(s"df1 partitioner: ${df1.rdd.partitioner}")
val df2 = (1 to 100000 by 2).toDF
println(s"df2 partitions: ${df2.rdd.getNumPartitions}")
println(s"df2 partitioner: ${df2.rdd.partitioner}")
val df3 = df1.union(df2)
println(s"df3 partitions: ${df3.rdd.getNumPartitions}")
println(s"df3 partitioner: ${df3.rdd.partitioner}")
****O/P****
df1 partitions: 8
df1 partitioner: None
df2 partitions: 8
df2 partitioner: None
df3 partitions: 16
df3 partitioner: None
线索来自 Scala 引擎的解释:
Union
:- Exchange RoundRobinPartitioning(10), [id=#757]
: +- LocalTableScan [value#154]
+- ReusedExchange [value#159], Exchange RoundRobinPartitioning(10), [id=#757]
ReusedExchange 是一种优化形式。 Catalyst 认为它们是相同的。
如果你有一个有 10000 个条目,一个有 10001 个条目,那么得到 20 个分区。 Spark 有点聪明。