让 Spark、Python 和 MongoDB 协同工作
Getting Spark, Python, and MongoDB to work together
我很难将这些组件正确地组合在一起。我安装了 Spark 并成功运行,我可以在本地、独立地以及通过 YARN 执行 运行 个作业。我已按照建议的步骤进行操作(据我所知)here and here
我正在研究Ubuntu,我拥有的各种组件版本是
- Spark spark-1.5.1-bin-hadoop2.6
- Hadoop hadoop-2.6.1
- Mongo 2.6.10
- Mongo-从 https://github.com/mongodb/mongo-hadoop.git
克隆的 Hadoop 连接器
- Python 2.7.10
我在执行各个步骤时遇到了一些困难,例如将哪些 jar 添加到哪个路径,所以我添加的是
- 在
/usr/local/share/hadoop-2.6.1/share/hadoop/mapreduce
我已经添加了mongo-hadoop-core-1.5.0-SNAPSHOT.jar
- 以下环境变量
export HADOOP_HOME="/usr/local/share/hadoop-2.6.1"
export PATH=$PATH:$HADOOP_HOME/bin
export SPARK_HOME="/usr/local/share/spark-1.5.1-bin-hadoop2.6"
export PYTHONPATH="/usr/local/share/mongo-hadoop/spark/src/main/python"
export PATH=$PATH:$SPARK_HOME/bin
我的Python程序很基础
from pyspark import SparkContext, SparkConf
import pymongo_spark
pymongo_spark.activate()
def main():
conf = SparkConf().setAppName("pyspark test")
sc = SparkContext(conf=conf)
rdd = sc.mongoRDD(
'mongodb://username:password@localhost:27017/mydb.mycollection')
if __name__ == '__main__':
main()
我正在运行使用命令
设置它
$SPARK_HOME/bin/spark-submit --driver-class-path /usr/local/share/mongo-hadoop/spark/build/libs/ --master local[4] ~/sparkPythonExample/SparkPythonExample.py
结果我得到以下输出
Traceback (most recent call last):
File "/home/me/sparkPythonExample/SparkPythonExample.py", line 24, in <module>
main()
File "/home/me/sparkPythonExample/SparkPythonExample.py", line 17, in main
rdd = sc.mongoRDD('mongodb://username:password@localhost:27017/mydb.mycollection')
File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 161, in mongoRDD
return self.mongoPairRDD(connection_string, config).values()
File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 143, in mongoPairRDD
_ensure_pickles(self)
File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 80, in _ensure_pickles
orig_tb)
py4j.protocol.Py4JError
根据here
This exception is raised when an exception occurs in the Java client
code. For example, if you try to pop an element from an empty stack.
The instance of the Java exception thrown is stored in the
java_exception member.
查看 pymongo_spark.py
的源代码和抛出错误的行,上面写着
"Error while communicating with the JVM. Is the MongoDB Spark jar on
Spark's CLASSPATH? : "
所以作为回应,我试图确保传递正确的 jar,但我可能做错了,见下文
$SPARK_HOME/bin/spark-submit --jars /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar --driver-class-path /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar --master local[4] ~/sparkPythonExample/SparkPythonExample.py
我已将 pymongo
导入同一个 python 程序以验证我至少可以使用它访问 MongoDB,我可以。
我知道这里有很多变化的部分,所以如果我能提供更多有用的信息,请告诉我。
我昨天遇到了同样的问题。能够通过将 mongo-java-driver.jar
放在 $HADOOP_HOME/lib
和 mongo-hadoop-core.jar
和 mongo-hadoop-spark.jar
中 $HADOOP_HOME/spark/classpath/emr
(或 $SPARK_CLASSPATH
中的任何其他文件夹来修复它).
如果有帮助请告诉我。
您能否尝试在 spark-submit 命令中使用 --package
选项而不是 --jars ...
:
spark-submit --packages org.mongodb.mongo-hadoop:mongo-hadoop-core:1.3.1,org.mongodb:mongo-java-driver:3.1.0 [REST OF YOUR OPTIONS]
其中一些 jar 文件不是 Uber jar,需要下载更多依赖项才能开始工作。
更新:
2016-07-04
自上次更新 MongoDB Spark Connector matured quite a lot. It provides up-to-date binaries 和基于数据源 API 但它使用 SparkConf
配置,因此主观上不如 Stratio/Spark-MongoDB 灵活。
2016-03-30
自最初的答案以来,我发现了两种不同的方式来从 Spark 连接到 MongoDB:
虽然前者似乎相对不成熟,但后者看起来是比 Mongo-Hadoop 连接器更好的选择,并提供了 Spark SQL API.
# Adjust Scala and package version according to your setup
# although officially 0.11 supports only Spark 1.5
# I haven't encountered any issues on 1.6.1
bin/pyspark --packages com.stratio.datasource:spark-mongodb_2.11:0.11.0
df = (sqlContext.read
.format("com.stratio.datasource.mongodb")
.options(host="mongo:27017", database="foo", collection="bar")
.load())
df.show()
## +---+----+--------------------+
## | x| y| _id|
## +---+----+--------------------+
## |1.0|-1.0|56fbe6f6e4120712c...|
## |0.0| 4.0|56fbe701e4120712c...|
## +---+----+--------------------+
它似乎比mongo-hadoop-spark
稳定得多,支持谓词下推,无需静态配置,简单易用。
原回答:
确实,这里有很多活动部件。我试图通过构建一个简单的 Docker 图像来使其更易于管理,该图像与描述的配置大致匹配(尽管为简洁起见,我省略了 Hadoop 库)。您可以找到 complete source on GitHub
(DOI 10.5281/zenodo.47882) 并从头开始构建它:
git clone https://github.com/zero323/docker-mongo-spark.git
cd docker-mongo-spark
docker build -t zero323/mongo-spark .
或下载我 pushed to Docker Hub 的图片,这样您就可以 docker pull zero323/mongo-spark
):
开始图片:
docker run -d --name mongo mongo:2.6
docker run -i -t --link mongo:mongo zero323/mongo-spark /bin/bash
启动 PySpark shell 通过 --jars
和 --driver-class-path
:
pyspark --jars ${JARS} --driver-class-path ${SPARK_DRIVER_EXTRA_CLASSPATH}
最后看看它是如何工作的:
import pymongo
import pymongo_spark
mongo_url = 'mongodb://mongo:27017/'
client = pymongo.MongoClient(mongo_url)
client.foo.bar.insert_many([
{"x": 1.0, "y": -1.0}, {"x": 0.0, "y": 4.0}])
client.close()
pymongo_spark.activate()
rdd = (sc.mongoRDD('{0}foo.bar'.format(mongo_url))
.map(lambda doc: (doc.get('x'), doc.get('y'))))
rdd.collect()
## [(1.0, -1.0), (0.0, 4.0)]
请注意,mongo-hadoop 似乎会在第一次操作后关闭连接。因此在收集之后调用例如 rdd.count()
将抛出异常。
基于我在创建此图像时遇到的不同问题,我倾向于认为 传递 mongo-hadoop-1.5.0-SNAPSHOT.jar
和 mongo-hadoop-spark-1.5.0-SNAPSHOT.jar
--jars
和 --driver-class-path
是唯一的硬性要求.
备注:
- 此图像大致基于 jaceklaskowski/docker-spark
所以如果有帮助,请务必向 @jacek-laskowski 发送一些善意。
- 如果不需要包含 new API 的开发版本,那么使用
--packages
很可能是更好的选择。
祝你好运!
@见https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage
from pyspark import SparkContext, SparkConf
import pymongo_spark
# Important: activate pymongo_spark.
pymongo_spark.activate()
def main():
conf = SparkConf().setAppName("pyspark test")
sc = SparkContext(conf=conf)
# Create an RDD backed by the MongoDB collection.
# This RDD *does not* contain key/value pairs, just documents.
# If you want key/value pairs, use the mongoPairRDD method instead.
rdd = sc.mongoRDD('mongodb://localhost:27017/db.collection')
# Save this RDD back to MongoDB as a different collection.
rdd.saveToMongoDB('mongodb://localhost:27017/db.other.collection')
# You can also read and write BSON:
bson_rdd = sc.BSONFileRDD('/path/to/file.bson')
bson_rdd.saveToBSON('/path/to/bson/output')
if __name__ == '__main__':
main()
我很难将这些组件正确地组合在一起。我安装了 Spark 并成功运行,我可以在本地、独立地以及通过 YARN 执行 运行 个作业。我已按照建议的步骤进行操作(据我所知)here and here
我正在研究Ubuntu,我拥有的各种组件版本是
- Spark spark-1.5.1-bin-hadoop2.6
- Hadoop hadoop-2.6.1
- Mongo 2.6.10
- Mongo-从 https://github.com/mongodb/mongo-hadoop.git 克隆的 Hadoop 连接器
- Python 2.7.10
我在执行各个步骤时遇到了一些困难,例如将哪些 jar 添加到哪个路径,所以我添加的是
- 在
/usr/local/share/hadoop-2.6.1/share/hadoop/mapreduce
我已经添加了mongo-hadoop-core-1.5.0-SNAPSHOT.jar
- 以下环境变量
export HADOOP_HOME="/usr/local/share/hadoop-2.6.1"
export PATH=$PATH:$HADOOP_HOME/bin
export SPARK_HOME="/usr/local/share/spark-1.5.1-bin-hadoop2.6"
export PYTHONPATH="/usr/local/share/mongo-hadoop/spark/src/main/python"
export PATH=$PATH:$SPARK_HOME/bin
我的Python程序很基础
from pyspark import SparkContext, SparkConf
import pymongo_spark
pymongo_spark.activate()
def main():
conf = SparkConf().setAppName("pyspark test")
sc = SparkContext(conf=conf)
rdd = sc.mongoRDD(
'mongodb://username:password@localhost:27017/mydb.mycollection')
if __name__ == '__main__':
main()
我正在运行使用命令
设置它$SPARK_HOME/bin/spark-submit --driver-class-path /usr/local/share/mongo-hadoop/spark/build/libs/ --master local[4] ~/sparkPythonExample/SparkPythonExample.py
结果我得到以下输出
Traceback (most recent call last):
File "/home/me/sparkPythonExample/SparkPythonExample.py", line 24, in <module>
main()
File "/home/me/sparkPythonExample/SparkPythonExample.py", line 17, in main
rdd = sc.mongoRDD('mongodb://username:password@localhost:27017/mydb.mycollection')
File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 161, in mongoRDD
return self.mongoPairRDD(connection_string, config).values()
File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 143, in mongoPairRDD
_ensure_pickles(self)
File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 80, in _ensure_pickles
orig_tb)
py4j.protocol.Py4JError
根据here
This exception is raised when an exception occurs in the Java client code. For example, if you try to pop an element from an empty stack. The instance of the Java exception thrown is stored in the java_exception member.
查看 pymongo_spark.py
的源代码和抛出错误的行,上面写着
"Error while communicating with the JVM. Is the MongoDB Spark jar on Spark's CLASSPATH? : "
所以作为回应,我试图确保传递正确的 jar,但我可能做错了,见下文
$SPARK_HOME/bin/spark-submit --jars /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar --driver-class-path /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar --master local[4] ~/sparkPythonExample/SparkPythonExample.py
我已将 pymongo
导入同一个 python 程序以验证我至少可以使用它访问 MongoDB,我可以。
我知道这里有很多变化的部分,所以如果我能提供更多有用的信息,请告诉我。
我昨天遇到了同样的问题。能够通过将 mongo-java-driver.jar
放在 $HADOOP_HOME/lib
和 mongo-hadoop-core.jar
和 mongo-hadoop-spark.jar
中 $HADOOP_HOME/spark/classpath/emr
(或 $SPARK_CLASSPATH
中的任何其他文件夹来修复它).
如果有帮助请告诉我。
您能否尝试在 spark-submit 命令中使用 --package
选项而不是 --jars ...
:
spark-submit --packages org.mongodb.mongo-hadoop:mongo-hadoop-core:1.3.1,org.mongodb:mongo-java-driver:3.1.0 [REST OF YOUR OPTIONS]
其中一些 jar 文件不是 Uber jar,需要下载更多依赖项才能开始工作。
更新:
2016-07-04
自上次更新 MongoDB Spark Connector matured quite a lot. It provides up-to-date binaries 和基于数据源 API 但它使用 SparkConf
配置,因此主观上不如 Stratio/Spark-MongoDB 灵活。
2016-03-30
自最初的答案以来,我发现了两种不同的方式来从 Spark 连接到 MongoDB:
虽然前者似乎相对不成熟,但后者看起来是比 Mongo-Hadoop 连接器更好的选择,并提供了 Spark SQL API.
# Adjust Scala and package version according to your setup
# although officially 0.11 supports only Spark 1.5
# I haven't encountered any issues on 1.6.1
bin/pyspark --packages com.stratio.datasource:spark-mongodb_2.11:0.11.0
df = (sqlContext.read
.format("com.stratio.datasource.mongodb")
.options(host="mongo:27017", database="foo", collection="bar")
.load())
df.show()
## +---+----+--------------------+
## | x| y| _id|
## +---+----+--------------------+
## |1.0|-1.0|56fbe6f6e4120712c...|
## |0.0| 4.0|56fbe701e4120712c...|
## +---+----+--------------------+
它似乎比mongo-hadoop-spark
稳定得多,支持谓词下推,无需静态配置,简单易用。
原回答:
确实,这里有很多活动部件。我试图通过构建一个简单的 Docker 图像来使其更易于管理,该图像与描述的配置大致匹配(尽管为简洁起见,我省略了 Hadoop 库)。您可以找到 complete source on GitHub
(DOI 10.5281/zenodo.47882) 并从头开始构建它:
git clone https://github.com/zero323/docker-mongo-spark.git
cd docker-mongo-spark
docker build -t zero323/mongo-spark .
或下载我 pushed to Docker Hub 的图片,这样您就可以 docker pull zero323/mongo-spark
):
开始图片:
docker run -d --name mongo mongo:2.6
docker run -i -t --link mongo:mongo zero323/mongo-spark /bin/bash
启动 PySpark shell 通过 --jars
和 --driver-class-path
:
pyspark --jars ${JARS} --driver-class-path ${SPARK_DRIVER_EXTRA_CLASSPATH}
最后看看它是如何工作的:
import pymongo
import pymongo_spark
mongo_url = 'mongodb://mongo:27017/'
client = pymongo.MongoClient(mongo_url)
client.foo.bar.insert_many([
{"x": 1.0, "y": -1.0}, {"x": 0.0, "y": 4.0}])
client.close()
pymongo_spark.activate()
rdd = (sc.mongoRDD('{0}foo.bar'.format(mongo_url))
.map(lambda doc: (doc.get('x'), doc.get('y'))))
rdd.collect()
## [(1.0, -1.0), (0.0, 4.0)]
请注意,mongo-hadoop 似乎会在第一次操作后关闭连接。因此在收集之后调用例如 rdd.count()
将抛出异常。
基于我在创建此图像时遇到的不同问题,我倾向于认为 传递 mongo-hadoop-1.5.0-SNAPSHOT.jar
和 mongo-hadoop-spark-1.5.0-SNAPSHOT.jar
--jars
和 --driver-class-path
是唯一的硬性要求.
备注:
- 此图像大致基于 jaceklaskowski/docker-spark 所以如果有帮助,请务必向 @jacek-laskowski 发送一些善意。
- 如果不需要包含 new API 的开发版本,那么使用
--packages
很可能是更好的选择。
祝你好运!
@见https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage
from pyspark import SparkContext, SparkConf
import pymongo_spark
# Important: activate pymongo_spark.
pymongo_spark.activate()
def main():
conf = SparkConf().setAppName("pyspark test")
sc = SparkContext(conf=conf)
# Create an RDD backed by the MongoDB collection.
# This RDD *does not* contain key/value pairs, just documents.
# If you want key/value pairs, use the mongoPairRDD method instead.
rdd = sc.mongoRDD('mongodb://localhost:27017/db.collection')
# Save this RDD back to MongoDB as a different collection.
rdd.saveToMongoDB('mongodb://localhost:27017/db.other.collection')
# You can also read and write BSON:
bson_rdd = sc.BSONFileRDD('/path/to/file.bson')
bson_rdd.saveToBSON('/path/to/bson/output')
if __name__ == '__main__':
main()