从 Python 中的 MongoDB GridFS 加载 Spark 2.x DataFrame

Loading Spark 2.x DataFrame from MongoDB GridFS in Python

我在 elephas 下使用 pyspark sql 和 keras。

我想尝试使用 mongoDB GridFS

进行某种分布式图像处理

我找到了相关问题,但是在 Java 世界上的 Scala Loading a Spark 2.x DataFrame from MongoDB GridFS

但仅此而已,我找不到任何其他关于如何使用 pySpark 的 GridFS 的文档。

我的 pyspark - mongo 代码如下所示:

sparkConf = SparkConf().setMaster("local[4]").setAppName("MongoSparkConnectorTour")\
                                             .set("spark.app.id", "MongoSparkConnectorTour")\
                                             .set("spark.mongodb.input.database", config.MONGO_DB)

# If executed via pyspark, sc is already instantiated
sc = SparkContext(conf=sparkConf)
sqlContext = SQLContext(sc)

dk = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")\
                    .option("spark.mongodb.input.uri", config.MONGO_MED_EVENTS)\
                    .load()

if (dk.count() > 0):
    # print data frame schema
    dk.printSchema()

    # Preview Dataframe (Pandas Preview is Cleaner)
    print( dk.limit(5).toPandas() )

是否可以通过这种方式处理 GridFS 数据?我想看看最小的例子。

有一种方法可以将 Scala 代码转换为 Pyspark。

  1. https://mvnrepository.com/artifact/org.mongodb.mongo-hadoop/mongo-hadoop-core/2.0.2

  2. 下载mongo-hadoop-core.jar
  3. 运行 包含 jar 的 pyspark:

SPARK_CLASSPATH=./path/to/mongo-hadoop-core.jar pyspark
  1. 和翻译代码:
sc = SparkContext(conf=sparkConf)

mongo_conf = {
    "mongo.input.uri": "mongodb://..."
    "mongo.input.query": s"...mongo query here..."
}

rdd = sc.newAPIHadoopRDD("com.mongodb.hadoop.GridFSInputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.apache.hadoop.io.MapWritable", conf=conf)

我对 keyClassvalueClass 不是百分百确定,所以这里是我用来编译此代码的来源: