Apache Spark - shuffle 写入的数据多于输入数据的大小
Apache Spark - shuffle writes more data than the size of the input data
我在本地模式下使用 Spark 2.1,我是 运行 这个简单的应用程序。
val N = 10 << 20
sparkSession.conf.set("spark.sql.shuffle.partitions", "5")
sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", (N + 1).toString)
sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")
val df1 = sparkSession.range(N).selectExpr(s"id as k1")
val df2 = sparkSession.range(N / 5).selectExpr(s"id * 3 as k2")
df1.join(df2, col("k1") === col("k2")).count()
在这里,range(N) 创建了一个 Long 的数据集(具有唯一值),所以我假设
的大小
- df1 = N * 8 bytes ~ 80MB
- df2 = N / 5 * 8 bytes ~ 16MB
好了,现在我们以df1为例。
df1 包含 8 个分区 和 shuffledRDDs of 5,所以我假设
- # of mappers (M) = 8
- # of reducers (R) = 5
由于分区数量较少,Spark 将使用 Hash Shuffle 将在磁盘中创建 M * R 个文件,但我不明白是否每个文件都有所有数据,因此 each_file_size = data_size 导致 M * R * data_size 文件或 all_files = data_size.
然而,当执行这个应用程序时,shuffle write of df1 = 160MB 这与上述任何一种情况都不匹配。
Spark UI
我在这里错过了什么?为什么 shuffle 写入数据的大小增加了一倍?
首先,让我们看看data size total(min, med, max)
是什么意思:
根据 dataSize.add(row.getSizeInBytes)
的 SQLMetrics.scala#L88 and ShuffleExchange.scala#L43, the data size total(min, med, max)
we see is the final value of dataSize
metric of shuffle. Then, how is it updated? It get updated each time a record is serialized: UnsafeRowSerializer.scala#L66(UnsafeRow
是 Spark SQL 中记录的内部表示)。
在内部,UnsafeRow
由 byte[]
支持,并在序列化期间直接复制到底层输出流,其 getSizeInBytes()
方法只是 return 的长度byte[]
。因此,最初的问题转化为:为什么字节表示是记录中唯一的 long
列的两倍大? UnsafeRow.scala 文档给了我们答案:
Each tuple has three parts: [null bit set] [values] [variable length portion]
The bit set is used for null tracking and is aligned to 8-byte word boundaries. It stores one bit per field.
因为它是 8 字节字对齐的,唯一的 1 个空位占用另外 8 个字节,与长列的宽度相同。因此,每个 UnsafeRow
代表您使用 16 个字节的一长列行。
我在本地模式下使用 Spark 2.1,我是 运行 这个简单的应用程序。
val N = 10 << 20
sparkSession.conf.set("spark.sql.shuffle.partitions", "5")
sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", (N + 1).toString)
sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")
val df1 = sparkSession.range(N).selectExpr(s"id as k1")
val df2 = sparkSession.range(N / 5).selectExpr(s"id * 3 as k2")
df1.join(df2, col("k1") === col("k2")).count()
在这里,range(N) 创建了一个 Long 的数据集(具有唯一值),所以我假设
的大小
- df1 = N * 8 bytes ~ 80MB
- df2 = N / 5 * 8 bytes ~ 16MB
好了,现在我们以df1为例。 df1 包含 8 个分区 和 shuffledRDDs of 5,所以我假设
- # of mappers (M) = 8
- # of reducers (R) = 5
由于分区数量较少,Spark 将使用 Hash Shuffle 将在磁盘中创建 M * R 个文件,但我不明白是否每个文件都有所有数据,因此 each_file_size = data_size 导致 M * R * data_size 文件或 all_files = data_size.
然而,当执行这个应用程序时,shuffle write of df1 = 160MB 这与上述任何一种情况都不匹配。
Spark UI
我在这里错过了什么?为什么 shuffle 写入数据的大小增加了一倍?
首先,让我们看看data size total(min, med, max)
是什么意思:
根据 dataSize.add(row.getSizeInBytes)
的 SQLMetrics.scala#L88 and ShuffleExchange.scala#L43, the data size total(min, med, max)
we see is the final value of dataSize
metric of shuffle. Then, how is it updated? It get updated each time a record is serialized: UnsafeRowSerializer.scala#L66(UnsafeRow
是 Spark SQL 中记录的内部表示)。
在内部,UnsafeRow
由 byte[]
支持,并在序列化期间直接复制到底层输出流,其 getSizeInBytes()
方法只是 return 的长度byte[]
。因此,最初的问题转化为:为什么字节表示是记录中唯一的 long
列的两倍大? UnsafeRow.scala 文档给了我们答案:
Each tuple has three parts: [null bit set] [values] [variable length portion]
The bit set is used for null tracking and is aligned to 8-byte word boundaries. It stores one bit per field.
因为它是 8 字节字对齐的,唯一的 1 个空位占用另外 8 个字节,与长列的宽度相同。因此,每个 UnsafeRow
代表您使用 16 个字节的一长列行。