Apache Spark - shuffle 写入的数据多于输入数据的大小

Apache Spark - shuffle writes more data than the size of the input data

我在本地模式下使用 Spark 2.1,我是 运行 这个简单的应用程序。

val N = 10 << 20

sparkSession.conf.set("spark.sql.shuffle.partitions", "5")
sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", (N + 1).toString)
sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")

val df1 = sparkSession.range(N).selectExpr(s"id as k1")
val df2 = sparkSession.range(N / 5).selectExpr(s"id * 3 as k2")

df1.join(df2, col("k1") === col("k2")).count()

在这里,range(N) 创建了一个 Long 的数据集(具有唯一值),所以我假设

的大小
  • df1 = N * 8 bytes ~ 80MB
  • df2 = N / 5 * 8 bytes ~ 16MB

好了,现在我们以df1为例。 df1 包含 8 个分区shuffledRDDs of 5,所以我假设

  • # of mappers (M) = 8
  • # of reducers (R) = 5

由于分区数量较少,Spark 将使用 Hash Shuffle 将在磁盘中创建 M * R 个文件,但我不明白是否每个文件都有所有数据,因此 each_file_size = data_size 导致 M * R * data_size 文件或 all_files = data_size.

然而,当执行这个应用程序时,shuffle write of df1 = 160MB 这与上述任何一种情况都不匹配。

Spark UI

我在这里错过了什么?为什么 shuffle 写入数据的大小增加了一倍?

首先,让我们看看data size total(min, med, max)是什么意思:

根据 dataSize.add(row.getSizeInBytes)SQLMetrics.scala#L88 and ShuffleExchange.scala#L43, the data size total(min, med, max) we see is the final value of dataSize metric of shuffle. Then, how is it updated? It get updated each time a record is serialized: UnsafeRowSerializer.scala#L66UnsafeRow 是 Spark SQL 中记录的内部表示)。

在内部,UnsafeRowbyte[] 支持,并在序列化期间直接复制到底层输出流,其 getSizeInBytes() 方法只是 return 的长度byte[]。因此,最初的问题转化为:为什么字节表示是记录中唯一的 long 列的两倍大? UnsafeRow.scala 文档给了我们答案:

Each tuple has three parts: [null bit set] [values] [variable length portion]

The bit set is used for null tracking and is aligned to 8-byte word boundaries. It stores one bit per field.

因为它是 8 字节字对齐的,唯一的 1 个空位占用另外 8 个字节,与长列的宽度相同。因此,每个 UnsafeRow 代表您使用 16 个字节的一长列行。