使用 LIMIT 评估 Spark SQL 查询两次时获得相同的结果
Obtain same results when evaluating a Spark SQL query twice with LIMIT
我最近开始使用 pyspark,我遇到了一些我试图更好地理解并避免的行为。
考虑以下代码:
query1 = "SELECT * FROM A where X >= 1000000 and X < 1001000 LIMIT 50"
s1 = spark.sql(query1)
X_vals = s1.select('X').rdd.flatMap(lambda x: x).collect()
query2 = "SELECT * FROM B" + " where Y in " + '(' + ','.join([str(x) for x in X_vals]) + ')'
s2 = spark.sql(query2)
s1.write.mode('overwrite').option("header", True).option("sep",'\t').csv('test/A.csv')
s2.write.mode('overwrite').option("header", True).option("sep",'\t').csv('test/B.csv')
从A
,我从一个范围中获取了50条记录的样本,并将X
的值存储在X_vals
中。然后我从 table B
.
中获取相同的记录(其中 Y
in X_vals
)
稍后,我将两个 table 写入 csv
文件。在生成的 csv
文件中,A
中的 X
与 B
中的 Y
不再匹配。
我认为这是可以解释的行为,是由惰性求值引起的; collect()
语句中选择的记录与 .csv
语句中的记录不同。然而,我对 Spark 的理解还不足以解释为什么会发生这种情况。
所以;为什么会发生这种情况,有没有办法强制查询 return 相同的结果两次(不加入 tables)?
谢谢,
弗洛里安
问题是 LIMIT
的执行。它通过将记录洗牌到单个分区来实现(您可以在 的优秀答案中找到详细解释)。
同时,Spark遵循SQL标准规则——如果没有明确的顺序,那么优化器可以选择任意记录。
val df = spark.range(1000)
df.where($"id".between(100, 200)).limit(10).explain
== Physical Plan ==
CollectLimit 10
+- *LocalLimit 10
+- *Filter ((id#16L >= 100) && (id#16L <= 200))
+- *Range (0, 1000, step=1, splits=4)
为了获得确定性(某种程度上,AFAIK 关系不确定地解决)命令使用 orderBy
子句,将 CollectLimit
转换为 TakeOrderedAndProject
:
df.where($"id".between(100, 200)).orderBy("id").limit(10).explain
== Physical Plan ==
TakeOrderedAndProject(limit=10, orderBy=[id#16L ASC NULLS FIRST], output=[id#16L])
+- *Filter ((id#16L >= 100) && (id#16L <= 200))
+- *Range (0, 1000, step=1, splits=4)
我最近开始使用 pyspark,我遇到了一些我试图更好地理解并避免的行为。
考虑以下代码:
query1 = "SELECT * FROM A where X >= 1000000 and X < 1001000 LIMIT 50"
s1 = spark.sql(query1)
X_vals = s1.select('X').rdd.flatMap(lambda x: x).collect()
query2 = "SELECT * FROM B" + " where Y in " + '(' + ','.join([str(x) for x in X_vals]) + ')'
s2 = spark.sql(query2)
s1.write.mode('overwrite').option("header", True).option("sep",'\t').csv('test/A.csv')
s2.write.mode('overwrite').option("header", True).option("sep",'\t').csv('test/B.csv')
从A
,我从一个范围中获取了50条记录的样本,并将X
的值存储在X_vals
中。然后我从 table B
.
Y
in X_vals
)
稍后,我将两个 table 写入 csv
文件。在生成的 csv
文件中,A
中的 X
与 B
中的 Y
不再匹配。
我认为这是可以解释的行为,是由惰性求值引起的; collect()
语句中选择的记录与 .csv
语句中的记录不同。然而,我对 Spark 的理解还不足以解释为什么会发生这种情况。
所以;为什么会发生这种情况,有没有办法强制查询 return 相同的结果两次(不加入 tables)?
谢谢,
弗洛里安
问题是 LIMIT
的执行。它通过将记录洗牌到单个分区来实现(您可以在
同时,Spark遵循SQL标准规则——如果没有明确的顺序,那么优化器可以选择任意记录。
val df = spark.range(1000)
df.where($"id".between(100, 200)).limit(10).explain
== Physical Plan ==
CollectLimit 10
+- *LocalLimit 10
+- *Filter ((id#16L >= 100) && (id#16L <= 200))
+- *Range (0, 1000, step=1, splits=4)
为了获得确定性(某种程度上,AFAIK 关系不确定地解决)命令使用 orderBy
子句,将 CollectLimit
转换为 TakeOrderedAndProject
:
df.where($"id".between(100, 200)).orderBy("id").limit(10).explain
== Physical Plan ==
TakeOrderedAndProject(limit=10, orderBy=[id#16L ASC NULLS FIRST], output=[id#16L])
+- *Filter ((id#16L >= 100) && (id#16L <= 200))
+- *Range (0, 1000, step=1, splits=4)