Load/import CSV 文件到 mongodb 使用 PYSPARK

Load/import CSV file in to mongodb using PYSPARK

我想知道如何使用 pyspark load/import 将 CSV 文件导入 mongodb。我在桌面上放置了一个名为 cal.csv 的 csv 文件。有人可以分享代码片段吗?

首先读取 csv as pyspark 数据框。

from pyspark import SparkConf,SparkContext
from pyspark.sql import SQLContext

sc = SparkContext(conf = conf)
sql = SQLContext(sc)

df = sql.read.csv("cal.csv", header=True, mode="DROPMALFORMED")

然后写到mongodb,

df.write.format('com.mongodb.spark.sql.DefaultSource').mode('append')\
        .option('database',NAME).option('collection',COLLECTION_MONGODB).save()

指定您创建的 NAMECOLLECTION_MONGODB

另外,你需要根据你的版本给conf和packages以及spark-submit,

/bin/spark-submit --conf "spark.mongodb.inuri=mongodb://127.0.0.1/DATABASE.COLLECTION_NAME?readPreference=primaryPreferred"
                  --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/DATABASE.COLLECTION_NAME" 
                  --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.0
                  tester.py

在上面指定 COLLECTION_NAMEDATABASEtester.py假设代码文件的名称。有关详细信息,请参阅 this.

这对我有用。 database:peopleCollection:con

pyspark --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/people.con?readPreference=primaryPreferred" \
    --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/people.con" \
    --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.0


from pyspark.sql import SparkSession

my_spark = SparkSession \
         .builder \
         .appName("myApp") \
         .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/people.con") \
         .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/people.con") \
         .getOrCreate()

df = spark.read.csv(path = "file:///home/user/Desktop/people.csv", header=True, inferSchema=True)

df.printSchema()

df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").option("database","people").option("collection", "con").save()

接下来转到mongo并按照以下步骤检查集合是否已写入

mongo
show dbs
use people
show collections
db.con.find().pretty()