如何将 spark 结构流写入 mongodb 集合?
How to write spark structure stream into mongodb collection?
Spark 数据帧可以写入 mongodb 集合。参考 - https://docs.mongodb.com/spark-connector/master/python/write-to-mongodb/
但是当尝试将 spark 结构流写入 mongodb 集合时,它不起作用。
您能否提出比在 udf 中使用 pymongo 代码更好的选择来实现此目标。
已使用 foreachBatch
接收器解决。 PFB 工作示例代码。
def write_mongo_row(df, epoch_id):
mongoURL = "mongodb://XX.XX.XX.XX:27017/test.collection"
df.write.format("mongo").mode("append").option("uri",mongoURL).save()
pass
query=csvDF.writeStream.foreachBatch(write_mongo_row).start()
query.awaitTermination()
灵感来自
分享一个替代解决方案,在一开始就注意配置部分,而不是稍后在 save
方法中处理配置(将配置与逻辑分开)。
def save(message: DataFrame):
message.write \
.format("mongo") \
.mode("append") \
.option("database", "db_name") \
.option("collection", "collection_name") \
.save()
pass
spark: SparkSession = SparkSession \
.builder \
.appName("MyApp") \
.config("spark.mongodb.input.uri", "mongodb://localhost:27017") \
.config("spark.mongodb.output.uri", "mongodb://localhost:27017") \
.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
.master("local") \
.getOrCreate()
df: DataFrame = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
query: StreamingQuery = df\
.writeStream \
.foreachBatch(save) \
.start()
query.awaitTermination()
Spark 数据帧可以写入 mongodb 集合。参考 - https://docs.mongodb.com/spark-connector/master/python/write-to-mongodb/
但是当尝试将 spark 结构流写入 mongodb 集合时,它不起作用。
您能否提出比在 udf 中使用 pymongo 代码更好的选择来实现此目标。
已使用 foreachBatch
接收器解决。 PFB 工作示例代码。
def write_mongo_row(df, epoch_id):
mongoURL = "mongodb://XX.XX.XX.XX:27017/test.collection"
df.write.format("mongo").mode("append").option("uri",mongoURL).save()
pass
query=csvDF.writeStream.foreachBatch(write_mongo_row).start()
query.awaitTermination()
灵感来自
分享一个替代解决方案,在一开始就注意配置部分,而不是稍后在 save
方法中处理配置(将配置与逻辑分开)。
def save(message: DataFrame):
message.write \
.format("mongo") \
.mode("append") \
.option("database", "db_name") \
.option("collection", "collection_name") \
.save()
pass
spark: SparkSession = SparkSession \
.builder \
.appName("MyApp") \
.config("spark.mongodb.input.uri", "mongodb://localhost:27017") \
.config("spark.mongodb.output.uri", "mongodb://localhost:27017") \
.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
.master("local") \
.getOrCreate()
df: DataFrame = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
query: StreamingQuery = df\
.writeStream \
.foreachBatch(save) \
.start()
query.awaitTermination()