从 Python 中的 MongoDB GridFS 加载 Spark 2.x DataFrame
Loading Spark 2.x DataFrame from MongoDB GridFS in Python
我在 elephas 下使用 pyspark sql 和 keras。
我想尝试使用 mongoDB GridFS
进行某种分布式图像处理
我找到了相关问题,但是在 Java 世界上的 Scala Loading a Spark 2.x DataFrame from MongoDB GridFS
但仅此而已,我找不到任何其他关于如何使用 pySpark 的 GridFS 的文档。
我的 pyspark - mongo 代码如下所示:
sparkConf = SparkConf().setMaster("local[4]").setAppName("MongoSparkConnectorTour")\
.set("spark.app.id", "MongoSparkConnectorTour")\
.set("spark.mongodb.input.database", config.MONGO_DB)
# If executed via pyspark, sc is already instantiated
sc = SparkContext(conf=sparkConf)
sqlContext = SQLContext(sc)
dk = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")\
.option("spark.mongodb.input.uri", config.MONGO_MED_EVENTS)\
.load()
if (dk.count() > 0):
# print data frame schema
dk.printSchema()
# Preview Dataframe (Pandas Preview is Cleaner)
print( dk.limit(5).toPandas() )
是否可以通过这种方式处理 GridFS 数据?我想看看最小的例子。
有一种方法可以将 Scala 代码转换为 Pyspark。
从https://mvnrepository.com/artifact/org.mongodb.mongo-hadoop/mongo-hadoop-core/2.0.2
下载mongo-hadoop-core.jar
运行 包含 jar 的 pyspark:
SPARK_CLASSPATH=./path/to/mongo-hadoop-core.jar pyspark
- 和翻译代码:
sc = SparkContext(conf=sparkConf)
mongo_conf = {
"mongo.input.uri": "mongodb://..."
"mongo.input.query": s"...mongo query here..."
}
rdd = sc.newAPIHadoopRDD("com.mongodb.hadoop.GridFSInputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.apache.hadoop.io.MapWritable", conf=conf)
我对 keyClass
和 valueClass
不是百分百确定,所以这里是我用来编译此代码的来源:
- https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage
- https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext.newAPIHadoopFile
- http://apache-spark-user-list.1001560.n3.nabble.com/unable-to-create-rdd-with-pyspark-newAPIHadoopRDD-td10358.html
- Loading a Spark 2.x DataFrame from MongoDB GridFS
我在 elephas 下使用 pyspark sql 和 keras。
我想尝试使用 mongoDB GridFS
进行某种分布式图像处理我找到了相关问题,但是在 Java 世界上的 Scala Loading a Spark 2.x DataFrame from MongoDB GridFS
但仅此而已,我找不到任何其他关于如何使用 pySpark 的 GridFS 的文档。
我的 pyspark - mongo 代码如下所示:
sparkConf = SparkConf().setMaster("local[4]").setAppName("MongoSparkConnectorTour")\
.set("spark.app.id", "MongoSparkConnectorTour")\
.set("spark.mongodb.input.database", config.MONGO_DB)
# If executed via pyspark, sc is already instantiated
sc = SparkContext(conf=sparkConf)
sqlContext = SQLContext(sc)
dk = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")\
.option("spark.mongodb.input.uri", config.MONGO_MED_EVENTS)\
.load()
if (dk.count() > 0):
# print data frame schema
dk.printSchema()
# Preview Dataframe (Pandas Preview is Cleaner)
print( dk.limit(5).toPandas() )
是否可以通过这种方式处理 GridFS 数据?我想看看最小的例子。
有一种方法可以将 Scala 代码转换为 Pyspark。
从https://mvnrepository.com/artifact/org.mongodb.mongo-hadoop/mongo-hadoop-core/2.0.2
下载mongo-hadoop-core.jar
运行 包含 jar 的 pyspark:
SPARK_CLASSPATH=./path/to/mongo-hadoop-core.jar pyspark
- 和翻译代码:
sc = SparkContext(conf=sparkConf)
mongo_conf = {
"mongo.input.uri": "mongodb://..."
"mongo.input.query": s"...mongo query here..."
}
rdd = sc.newAPIHadoopRDD("com.mongodb.hadoop.GridFSInputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.apache.hadoop.io.MapWritable", conf=conf)
我对 keyClass
和 valueClass
不是百分百确定,所以这里是我用来编译此代码的来源:
- https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage
- https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext.newAPIHadoopFile
- http://apache-spark-user-list.1001560.n3.nabble.com/unable-to-create-rdd-with-pyspark-newAPIHadoopRDD-td10358.html
- Loading a Spark 2.x DataFrame from MongoDB GridFS