了解 Spark Explain:收集 vs 全局 vs 局部限制

Understanding Spark Explain: Collect vs Global vs Local Limit

我想看看在 Spark/AWS Glue

中进行限制之间的区别

我试过使用 Spark SQL

spark.sql("SELECT * FROM flights LIMIT 10")

解释看起来像这样:

CollectLimit 10
+- *FileScan parquet xxxxxx.flights[Id#31,...] Batched: true, Format: Parquet, Location: CatalogFileIndex[s3://xxxxxx/flights], PartitionCount: 14509, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<...

然后我尝试使用 AWS Glue 数据目录看看它是否更快

gdf = glueContext.create_dynamic_frame.from_catalog(database = "xxxxxx", table_name = "xxxxxx")
df = gdf.toDF()
df = df.limit(10)

df.explain(True)

df.show(10)

解释如下:

GlobalLimit 10
+- LocalLimit 10
+- LogicalRDD [Id#70, ...]

第一次在 5 分钟内运行,第二次在 4 分钟内运行,还不是很重要,但我认为查询数据目录速度更快,或者对数据框进行限制比对数据框进行限制更好火花 SQL?

收集限制、全球限制和本地限制之间有什么区别?我猜本地限制意味着它确实在本地限制然后驱动程序将执行全局限制以给出最终结果。但是为什么 Spark SQL 不也做这个优化呢?

Spark 在做任何限制之前是否读取所有底层 parquet 文件?在这个例子中,有没有办法告诉 spark 读取直到只有 10 行?

  1. SQL 方式,程序化数据集创建 - 两种情况下的控制流程相同,它通过 Spark SQL 催化剂。在您的情况下,当查询是第一次 运行 时,它会从 Metastore 中获取有关 table 的元数据并将其缓存,在后续查询中,它会被重用,这可能是第一个查询缓慢。
  2. 没有 LogicalPlan 节点,因为 CollectLimit,只有 CollectLimitExec physicalplan 节点。 limit 实现为 LocalLimit 后跟 GlobalLimit(link to code)
  3. Spark 以增量方式执行 limit
    它尝试使用一个分区检索给定数量的行。 如果不满足行数,Spark 会查询接下来的 4 个分区(由 spark.sql.limit.scaleUpFactor 决定,默认为 4),然后是 16 等等,直到满足限制或数据耗尽。