插入前 Spark 混洗数据

Spark shuffling data before insert

CalcDf().show 结果分为 13 个阶段 (0-12) +1(13) show 本身。

当我尝试将结果写入 table 时,我认为应该只有 13 个阶段 (0-12),但我看到了额外的阶段 (13)。它从哪里来,有什么作用?我没有执行任何需要洗牌的重新分区或其他操作。据我了解,spark 应该只将 1100 个文件写入 table,但事实并非如此。

CalcDf()
.write
.mode(SaveMode.Overwrite)
.insertInto("tn")

CalcDf() 逻辑

val dim = spark.sparkContext.broadcast(
spark.table("dim")
.as[Dim]
.map(r => r.id-> r.col)
.collect().toMap
)

spark.table("table")
.as[CustomCC]
.groupByKey(_.id)
.flatMapGroups{case(k, iterator) => CustomCC.mapRows(iterator, dim)}
.withColumn("time_key", lit("2021-07-01"))

前一阶段 #12 已完成随机写入,因此任何后续阶段都必须通过随机读取(您在 #13 中注意到)从中读取数据。

为什么要多一个阶段?

因为第 12 阶段有随机写入而不是输出

为了理解第 12 阶段,请提供有关如何构建 CalDf 的信息。

编辑

groupByKey 将进行随机写入,以便在单个执行程序 JVM 上获得相同的 ID。

第 13 阶段正在读取此混洗后的数据并计算映射操作。

任务计数的差异可归因于操作。
在 show() 中,它还没有读取整个打乱的数据。可能是因为它显示 20 行(默认) 而在 insertInto(...) 中,它对整个数据进行操作,因此读取所有数据。

stage #13 不仅仅是因为它在写入文件,而且它实际上在进行计算。