我想使用apache spark将每个rdd存储到twitter streaming中的数据库中,但是出现任务错误,无法在scala中序列化

i want to store each rdd into database in twitter streaming using apache spark but got error of task not serialize in scala

我编写了一个代码,其中推特流式传输 rdd 推文 class 并将每个 rdd 存储在数据库中,但它得到错误任务未序列化我粘贴代码。

sparkstreaming.scala

case class Tweet(id: Long, source: String, content: String, retweet: Boolean, authName: String, username: String, url: String, authId: Long, language: String)

trait SparkStreaming extends Connector {

  def startStream(appName: String, master: String): StreamingContext = {
    val db = connector("localhost", "rmongo", "rmongo", "pass")
    val dbcrud = new DBCrud(db, "table1")
    val sparkConf: SparkConf = new SparkConf().setAppName(appName).setMaster(master).set(" spark.driver.allowMultipleContexts", "true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    //  .set("spark.kryo.registrator", "HelloKryoRegistrator")
    //    sparkConf.registerKryoClasses(Array(classOf[DBCrud]))
    val sc: SparkContext = new SparkContext(sparkConf)
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(10))
    ssc
  }
}
object SparkStreaming extends SparkStreaming

我在平台控制器中使用此流上下文将推文存储在数据库中,但它抛出异常。我正在使用 mongodb 来存储它。

def streamstart = Action {
    val stream = SparkStreaming
    val a = stream.startStream("ss", "local[2]")
    val db = connector("localhost", "rmongo", "rmongo", "pass")
    val dbcrud = DBCrud
    val twitterauth = new TwitterClient().tweetCredantials()
    val tweetDstream = TwitterUtils.createStream(a, Option(twitterauth.getAuthorization))
    val tweets = tweetDstream.filter { x => x.getUser.getLang == "en" }.map { x => Tweet(x.getId, x.getSource, x.getText, x.isRetweet(), x.getUser.getName, x.getUser.getScreenName, x.getUser.getURL, x.getUser.getId, x.getUser.getLang) }
    //  tweets.foreachRDD { x => x.foreach { x => dbcrud.insert(x) } }
    tweets.saveAsTextFiles("/home/knoldus/sentiment project/spark services/tweets/tweets")
    //    val s=new BirdTweet() 
    //    s.hastag(a.sparkContext)
    a.start()
    Ok("start streaming")
  }

当制作一个接收推文并使用 forEachRDD 存储每条推文的流媒体时,它可以工作,但如果我从外部使用它,它就不起作用。

请帮帮我。

尝试在 foreachRDD 块内创建与 MongoDB 的连接,如 Spark Documentation

中所述
tweets.foreachRDD { x => 
 x.foreach { x => 
  val db = connector("localhost", "rmongo", "rmongo", "pass")
  val dbcrud = new DBCrud(db, "table1")
  dbcrud.insert(x) 
 } 
}