PySpark - 优化 parquet 读取后的分区数

PySpark - optimize number of partitions after parquet read

在由 yearmonth 分区的镶木地板数据湖中,spark.default.parallelism 设置为即 4,假设我想创建一个由月份组成的 DataFrame 2017 年 11~12 月,2018 年第 1~3 个月,两个来源 AB.

df = spark.read.parquet(
    "A.parquet/_YEAR={2017}/_MONTH={11,12}",
    "A.parquet/_YEAR={2018}/_MONTH={1,2,3}",
    "B.parquet/_YEAR={2017}/_MONTH={11,12}",
    "B.parquet/_YEAR={2018}/_MONTH={1,2,3}",
)

如果我得到分区数,Spark 默认使用 spark.default.parallelism

df.rdd.getNumPartitions()
Out[4]: 4

考虑到在创建 df 之后我需要在每个周期执行 joingroupBy 操作,并且数据或多或少均匀分布在每个周期(大约每个周期 1000 万行):

问题

Will a repartition improve the performance of my subsequent operations?

通常不会。抢先重新分区数据的唯一原因是为了避免在相同的 Dataset 用于多个连接时,基于相同的条件

进一步洗牌

If so, if I have 10 different periods (5 per year in both A and B), should I repartition by the number of periods and explicitly reference the columns to repartition (df.repartition(10,'_MONTH','_YEAR'))?

让我们一步步来:

  • should I repartition by the number of periods

    从业者不保证 1:1 级别和分区之间的关系,所以唯一要记住的是,非空分区不能多于唯一键,因此使用明显更大的值不会有道理。

  • and explicitly reference the columns to repartition

    如果您 repartition 然后 joingroupBy 对两个部分使用同一组列是唯一明智的解决方案。

总结

repartitoning before join 在两种情况下有意义:

  • 若有多个后续joins

    df_ = df.repartition(10, "foo", "bar")
    df_.join(df1, ["foo", "bar"])
    ...
    df_.join(df2, ["foo", "bar"])
    
  • output 分区的所需数量与 spark.sql.shuffle.partitions 不同时使用单一连接(并且没有广播连接)

    spark.conf.get("spark.sql.shuffle.partitions")
    # 200
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    
    df1_ = df1.repartition(11, "foo", "bar")
    df2_ = df2.repartition(11, "foo", "bar")
    
    df1_.join(df2_, ["foo", "bar"]).rdd.getNumPartitions()
    # 11
    
    df1.join(df2, ["foo", "bar"]).rdd.getNumPartitions()
    # 200
    

    这可能优于:

    spark.conf.set("spark.sql.shuffle.partitions", 11)
    df1.join(df2, ["foo", "bar"]).rdd.getNumPartitions()
    spark.conf.set("spark.sql.shuffle.partitions", 200)