使用 MongoSpark 将流式数据帧保存到 MongoDB

Saving a streaming DataFrame to MongoDB using MongoSpark

一些背景故事:对于大学的家庭作业项目,我们的任务是以可扩展的方式实施选择的算法。我们选择使用 Scala、Spark、MongoDB 和 Kafka,因为这些是课程中推荐的。要从我们的 MongoDB 读取数据,我们选择使用 MongoSpark,因为它允许对数据进行简单且可扩展的操作。我们还使用 Kafka 来模拟来自外部源的流。我们需要对 Kafka 产生的每个条目执行多项操作。问题来自将此数据的结果保存回 MongoDB。

我们有以下代码:

val streamDF = sparkSession
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "aTopic")
    .load
    .selectExpr("CAST(value AS STRING)")

从这里开始,我们就不知所措了。我们不能使用 .map,因为 MongoSpark 仅在数据帧、数据集和 RDD 上运行,并且不可序列化,并且使用 MongoSpark.save 不适用于指定的流数据帧。我们也不能使用默认的 MongoDB Scala 驱动程序,因为这会在添加依赖项时与 MongoSpark 发生冲突。请注意,该算法的其余部分严重依赖于连接和 groupbys。

我们如何从这里获取数据到我们的MongoDB?

编辑: 对于一个易于重现的示例,可以尝试以下操作:

val streamDF = sparkSession
    .readStream
    .format("rate")
    .load

MongoSpark.save 添加 .write 将导致异常,因为无法在流式 DataFrame 上调用 write。

Adding a .write to that, which is required for MongoSpark.save, will cause an exception because write cannot be called on a streaming DataFrame.

使用MongoSpark MongoDB Connector for Spark accepts RDD (as of current version 2.2). When utilising DStreamsave()方法,需要在stream中获取'batches'个RDD进行写入。

wordCounts.foreachRDD({ rdd =>
  import spark.implicits._
  val wordCounts = rdd.map({ case (word: String, count: Int)
          => WordCount(word, count) }).toDF()
  wordCounts.write.mode("append").mongo()
})

另请参阅: