如何将 spark 结构流写入 mongodb 集合?

How to write spark structure stream into mongodb collection?

Spark 数据帧可以写入 mongodb 集合。参考 - https://docs.mongodb.com/spark-connector/master/python/write-to-mongodb/

但是当尝试将 spark 结构流写入 mongodb 集合时,它不起作用。

您能否提出比在 udf 中使用 pymongo 代码更好的选择来实现此目标。

已使用 foreachBatch 接收器解决。 PFB 工作示例代码。

def write_mongo_row(df, epoch_id):
    mongoURL = "mongodb://XX.XX.XX.XX:27017/test.collection"
    df.write.format("mongo").mode("append").option("uri",mongoURL).save()
    pass

query=csvDF.writeStream.foreachBatch(write_mongo_row).start()
query.awaitTermination()

灵感来自

分享一个替代解决方案,在一开始就注意配置部分,而不是稍后在 save 方法中处理配置(将配置与逻辑分开)。

def save(message: DataFrame):
    message.write \
        .format("mongo") \
        .mode("append") \
        .option("database", "db_name") \
        .option("collection", "collection_name") \
        .save()
    pass

spark: SparkSession = SparkSession \
    .builder \
    .appName("MyApp") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017") \
    .config("spark.mongodb.output.uri", "mongodb://localhost:27017") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .master("local") \
    .getOrCreate()

df: DataFrame = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

query: StreamingQuery = df\
    .writeStream \
    .foreachBatch(save) \
    .start()

query.awaitTermination()