PySpark - 优化 parquet 读取后的分区数
PySpark - optimize number of partitions after parquet read
在由 year
和 month
分区的镶木地板数据湖中,spark.default.parallelism
设置为即 4
,假设我想创建一个由月份组成的 DataFrame 2017 年 11~12 月,2018 年第 1~3 个月,两个来源 A
和 B
.
df = spark.read.parquet(
"A.parquet/_YEAR={2017}/_MONTH={11,12}",
"A.parquet/_YEAR={2018}/_MONTH={1,2,3}",
"B.parquet/_YEAR={2017}/_MONTH={11,12}",
"B.parquet/_YEAR={2018}/_MONTH={1,2,3}",
)
如果我得到分区数,Spark 默认使用 spark.default.parallelism
:
df.rdd.getNumPartitions()
Out[4]: 4
考虑到在创建 df
之后我需要在每个周期执行 join
和 groupBy
操作,并且数据或多或少均匀分布在每个周期(大约每个周期 1000 万行):
问题
- 重新分区会提高后续操作的性能吗?
- 如果是这样,如果我有 10 个不同的时期(A 和 B 每年 5 个),我是否应该按时期数重新分区并明确引用要重新分区的列 (
df.repartition(10,'_MONTH','_YEAR')
)?
Will a repartition improve the performance of my subsequent operations?
通常不会。抢先重新分区数据的唯一原因是为了避免在相同的 Dataset
用于多个连接时,基于相同的条件
进一步洗牌
If so, if I have 10 different periods (5 per year in both A and B), should I repartition by the number of periods and explicitly reference the columns to repartition (df.repartition(10,'_MONTH','_YEAR'))?
让我们一步步来:
should I repartition by the number of periods
从业者不保证 1:1 级别和分区之间的关系,所以唯一要记住的是,非空分区不能多于唯一键,因此使用明显更大的值不会有道理。
and explicitly reference the columns to repartition
如果您 repartition
然后 join
或 groupBy
对两个部分使用同一组列是唯一明智的解决方案。
总结
repartitoning
before join 在两种情况下有意义:
若有多个后续joins
df_ = df.repartition(10, "foo", "bar")
df_.join(df1, ["foo", "bar"])
...
df_.join(df2, ["foo", "bar"])
当 output 分区的所需数量与 spark.sql.shuffle.partitions
不同时使用单一连接(并且没有广播连接)
spark.conf.get("spark.sql.shuffle.partitions")
# 200
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df1_ = df1.repartition(11, "foo", "bar")
df2_ = df2.repartition(11, "foo", "bar")
df1_.join(df2_, ["foo", "bar"]).rdd.getNumPartitions()
# 11
df1.join(df2, ["foo", "bar"]).rdd.getNumPartitions()
# 200
这可能优于:
spark.conf.set("spark.sql.shuffle.partitions", 11)
df1.join(df2, ["foo", "bar"]).rdd.getNumPartitions()
spark.conf.set("spark.sql.shuffle.partitions", 200)
在由 year
和 month
分区的镶木地板数据湖中,spark.default.parallelism
设置为即 4
,假设我想创建一个由月份组成的 DataFrame 2017 年 11~12 月,2018 年第 1~3 个月,两个来源 A
和 B
.
df = spark.read.parquet(
"A.parquet/_YEAR={2017}/_MONTH={11,12}",
"A.parquet/_YEAR={2018}/_MONTH={1,2,3}",
"B.parquet/_YEAR={2017}/_MONTH={11,12}",
"B.parquet/_YEAR={2018}/_MONTH={1,2,3}",
)
如果我得到分区数,Spark 默认使用 spark.default.parallelism
:
df.rdd.getNumPartitions()
Out[4]: 4
考虑到在创建 df
之后我需要在每个周期执行 join
和 groupBy
操作,并且数据或多或少均匀分布在每个周期(大约每个周期 1000 万行):
问题
- 重新分区会提高后续操作的性能吗?
- 如果是这样,如果我有 10 个不同的时期(A 和 B 每年 5 个),我是否应该按时期数重新分区并明确引用要重新分区的列 (
df.repartition(10,'_MONTH','_YEAR')
)?
Will a repartition improve the performance of my subsequent operations?
通常不会。抢先重新分区数据的唯一原因是为了避免在相同的 Dataset
用于多个连接时,基于相同的条件
If so, if I have 10 different periods (5 per year in both A and B), should I repartition by the number of periods and explicitly reference the columns to repartition (df.repartition(10,'_MONTH','_YEAR'))?
让我们一步步来:
should I repartition by the number of periods
从业者不保证 1:1 级别和分区之间的关系,所以唯一要记住的是,非空分区不能多于唯一键,因此使用明显更大的值不会有道理。
and explicitly reference the columns to repartition
如果您
repartition
然后join
或groupBy
对两个部分使用同一组列是唯一明智的解决方案。
总结
repartitoning
before join 在两种情况下有意义:
若有多个后续
joins
df_ = df.repartition(10, "foo", "bar") df_.join(df1, ["foo", "bar"]) ... df_.join(df2, ["foo", "bar"])
当 output 分区的所需数量与
spark.sql.shuffle.partitions
不同时使用单一连接(并且没有广播连接)spark.conf.get("spark.sql.shuffle.partitions") # 200 spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) df1_ = df1.repartition(11, "foo", "bar") df2_ = df2.repartition(11, "foo", "bar") df1_.join(df2_, ["foo", "bar"]).rdd.getNumPartitions() # 11 df1.join(df2, ["foo", "bar"]).rdd.getNumPartitions() # 200
这可能优于:
spark.conf.set("spark.sql.shuffle.partitions", 11) df1.join(df2, ["foo", "bar"]).rdd.getNumPartitions() spark.conf.set("spark.sql.shuffle.partitions", 200)