使用 MongoSpark 将流式数据帧保存到 MongoDB
Saving a streaming DataFrame to MongoDB using MongoSpark
一些背景故事:对于大学的家庭作业项目,我们的任务是以可扩展的方式实施选择的算法。我们选择使用 Scala、Spark、MongoDB 和 Kafka,因为这些是课程中推荐的。要从我们的 MongoDB 读取数据,我们选择使用 MongoSpark,因为它允许对数据进行简单且可扩展的操作。我们还使用 Kafka 来模拟来自外部源的流。我们需要对 Kafka 产生的每个条目执行多项操作。问题来自将此数据的结果保存回 MongoDB。
我们有以下代码:
val streamDF = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "aTopic")
.load
.selectExpr("CAST(value AS STRING)")
从这里开始,我们就不知所措了。我们不能使用 .map
,因为 MongoSpark 仅在数据帧、数据集和 RDD 上运行,并且不可序列化,并且使用 MongoSpark.save
不适用于指定的流数据帧。我们也不能使用默认的 MongoDB Scala 驱动程序,因为这会在添加依赖项时与 MongoSpark 发生冲突。请注意,该算法的其余部分严重依赖于连接和 groupbys。
我们如何从这里获取数据到我们的MongoDB?
编辑:
对于一个易于重现的示例,可以尝试以下操作:
val streamDF = sparkSession
.readStream
.format("rate")
.load
向 MongoSpark.save
添加 .write
将导致异常,因为无法在流式 DataFrame 上调用 write。
Adding a .write to that, which is required for MongoSpark.save, will cause an exception because write cannot be called on a streaming DataFrame.
使用MongoSpark MongoDB Connector for Spark accepts RDD (as of current version 2.2). When utilising DStream的save()
方法,需要在stream中获取'batches'个RDD进行写入。
wordCounts.foreachRDD({ rdd =>
import spark.implicits._
val wordCounts = rdd.map({ case (word: String, count: Int)
=> WordCount(word, count) }).toDF()
wordCounts.write.mode("append").mongo()
})
另请参阅:
一些背景故事:对于大学的家庭作业项目,我们的任务是以可扩展的方式实施选择的算法。我们选择使用 Scala、Spark、MongoDB 和 Kafka,因为这些是课程中推荐的。要从我们的 MongoDB 读取数据,我们选择使用 MongoSpark,因为它允许对数据进行简单且可扩展的操作。我们还使用 Kafka 来模拟来自外部源的流。我们需要对 Kafka 产生的每个条目执行多项操作。问题来自将此数据的结果保存回 MongoDB。
我们有以下代码:
val streamDF = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "aTopic")
.load
.selectExpr("CAST(value AS STRING)")
从这里开始,我们就不知所措了。我们不能使用 .map
,因为 MongoSpark 仅在数据帧、数据集和 RDD 上运行,并且不可序列化,并且使用 MongoSpark.save
不适用于指定的流数据帧。我们也不能使用默认的 MongoDB Scala 驱动程序,因为这会在添加依赖项时与 MongoSpark 发生冲突。请注意,该算法的其余部分严重依赖于连接和 groupbys。
我们如何从这里获取数据到我们的MongoDB?
编辑: 对于一个易于重现的示例,可以尝试以下操作:
val streamDF = sparkSession
.readStream
.format("rate")
.load
向 MongoSpark.save
添加 .write
将导致异常,因为无法在流式 DataFrame 上调用 write。
Adding a .write to that, which is required for MongoSpark.save, will cause an exception because write cannot be called on a streaming DataFrame.
使用MongoSpark MongoDB Connector for Spark accepts RDD (as of current version 2.2). When utilising DStream的save()
方法,需要在stream中获取'batches'个RDD进行写入。
wordCounts.foreachRDD({ rdd =>
import spark.implicits._
val wordCounts = rdd.map({ case (word: String, count: Int)
=> WordCount(word, count) }).toDF()
wordCounts.write.mode("append").mongo()
})
另请参阅: