为什么 Mongo Spark 连接器 returns 不同且不正确的查询计数?
Why Mongo Spark connector returns different and incorrect counts for a query?
我正在为一个项目评估 Mongo Spark 连接器,但我得到的结果不一致。我在笔记本电脑上本地使用 MongoDB 服务器版本 3.4.5、Spark(通过 PySpark)版本 2.2.0、Mongo Spark Connector 版本 2.11;2.2.0。对于我的测试数据库,我使用 Enron 数据集 http://mongodb-enron-email.s3-website-us-east-1.amazonaws.com/ 我对 Spark SQL 查询很感兴趣,当我开始 运行 简单的计数测试查询时,我收到每个 [=41] 的不同计数=].
这是我的 mongo shell:
的输出
> db.messages.count({'headers.To': 'eric.bass@enron.com'})
203
这是我的 PySpark shell 的一些输出:
In [1]: df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb://127.0.0.1/enron_mail.messages").load()
In [2]: df.registerTempTable("messages")
In [3]: res = spark.sql("select count(*) from messages where headers.To='eric.bass@enron.com'")
In [4]: res.show()
+--------+
|count(1)|
+--------+
| 162|
+--------+
In [5]: res.show()
+--------+
|count(1)|
+--------+
| 160|
+--------+
In [6]: res = spark.sql("select count(_id) from messages where headers.To='eric.bass@enron.com'")
In [7]: res.show()
+----------+
|count(_id)|
+----------+
| 161|
+----------+
In [8]: res.show()
+----------+
|count(_id)|
+----------+
| 162|
+----------+
我在 Google 中搜索了有关此问题的信息,但没有找到任何有用的信息。如果有人知道为什么会发生这种情况以及如何正确处理,请分享您的想法。我有一种感觉,也许我错过了什么或者配置不正确。
更新:
我解决了我的问题。计数不一致的原因是 MongoDefaultPartitioner 包装了使用随机抽样的 MongoSamplePartitioner。老实说,这对我来说是一个很奇怪的默认设置。我个人更喜欢有一个缓慢但一致的分区程序。分区程序选项的详细信息可以在官方 configuration options 文档中找到。
更新:
已将解决方案复制到答案中。
我解决了我的问题。计数不一致的原因是 MongoDefaultPartitioner 包装了使用随机抽样的 MongoSamplePartitioner。老实说,这对我来说是一个很奇怪的默认设置。我个人更喜欢有一个缓慢但一致的分区程序。分区程序选项的详细信息可以在官方 configuration options 文档中找到。
代码:
val df = spark.read
.format("com.mongodb.spark.sql.DefaultSource")
.option("uri", "mongodb://127.0.0.1/enron_mail.messages")
.option("partitioner", "spark.mongodb.input.partitionerOptions.MongoPaginateBySizePartitioner ")
.load()
此问题主要是由于 2.2.0 Mongo 连接器中的 SPARK-151 错误。它在2.2.1版本中得到解决,我已经确认。您可以继续使用 2.2.1 的默认分区程序。
我正在为一个项目评估 Mongo Spark 连接器,但我得到的结果不一致。我在笔记本电脑上本地使用 MongoDB 服务器版本 3.4.5、Spark(通过 PySpark)版本 2.2.0、Mongo Spark Connector 版本 2.11;2.2.0。对于我的测试数据库,我使用 Enron 数据集 http://mongodb-enron-email.s3-website-us-east-1.amazonaws.com/ 我对 Spark SQL 查询很感兴趣,当我开始 运行 简单的计数测试查询时,我收到每个 [=41] 的不同计数=]. 这是我的 mongo shell:
的输出> db.messages.count({'headers.To': 'eric.bass@enron.com'})
203
这是我的 PySpark shell 的一些输出:
In [1]: df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb://127.0.0.1/enron_mail.messages").load()
In [2]: df.registerTempTable("messages")
In [3]: res = spark.sql("select count(*) from messages where headers.To='eric.bass@enron.com'")
In [4]: res.show()
+--------+
|count(1)|
+--------+
| 162|
+--------+
In [5]: res.show()
+--------+
|count(1)|
+--------+
| 160|
+--------+
In [6]: res = spark.sql("select count(_id) from messages where headers.To='eric.bass@enron.com'")
In [7]: res.show()
+----------+
|count(_id)|
+----------+
| 161|
+----------+
In [8]: res.show()
+----------+
|count(_id)|
+----------+
| 162|
+----------+
我在 Google 中搜索了有关此问题的信息,但没有找到任何有用的信息。如果有人知道为什么会发生这种情况以及如何正确处理,请分享您的想法。我有一种感觉,也许我错过了什么或者配置不正确。
更新: 我解决了我的问题。计数不一致的原因是 MongoDefaultPartitioner 包装了使用随机抽样的 MongoSamplePartitioner。老实说,这对我来说是一个很奇怪的默认设置。我个人更喜欢有一个缓慢但一致的分区程序。分区程序选项的详细信息可以在官方 configuration options 文档中找到。
更新: 已将解决方案复制到答案中。
我解决了我的问题。计数不一致的原因是 MongoDefaultPartitioner 包装了使用随机抽样的 MongoSamplePartitioner。老实说,这对我来说是一个很奇怪的默认设置。我个人更喜欢有一个缓慢但一致的分区程序。分区程序选项的详细信息可以在官方 configuration options 文档中找到。
代码:
val df = spark.read
.format("com.mongodb.spark.sql.DefaultSource")
.option("uri", "mongodb://127.0.0.1/enron_mail.messages")
.option("partitioner", "spark.mongodb.input.partitionerOptions.MongoPaginateBySizePartitioner ")
.load()
此问题主要是由于 2.2.0 Mongo 连接器中的 SPARK-151 错误。它在2.2.1版本中得到解决,我已经确认。您可以继续使用 2.2.1 的默认分区程序。