启动 Spark 流上下文时出错

Error in starting Spark streaming context

我是 Spark Streaming 的新手,正在为 Twitter 连接器编写代码。当我 运行 此代码不止一次时,它给出以下异常。每次我都必须创建一个新的 hdfs 目录用于检查点,才能成功 运行 而且它不会停止。

 ERROR StreamingContext: Error starting the context, marking it as stopped
    org.apache.spark.SparkException: org.apache.spark.streaming.dstream.WindowedDStream@532d0784 has not been initialized
    at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:342)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:342)
    at scala.Option.orElse(Option.scala:257)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
    at org.apache.spark.streaming.DStreamGraph$$anonfun.apply(DStreamGraph.scala:120)
    at org.apache.spark.streaming.DStreamGraph$$anonfun.apply(DStreamGraph.scala:120)
    at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
    at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart.apply(JobGenerator.scala:227)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart.apply(JobGenerator.scala:222)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
    at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
    at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
    at org.apache.spark.streaming.StreamingContext.liftedTree1(StreamingContext.scala:588)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
    at twitter.streamingSpark$.twitterConnector(App.scala:38)
    at twitter.streamingSpark$.main(App.scala:26)
    at twitter.streamingSpark.main(App.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:169)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

相关代码为

 def twitterConnector() :Unit =
 {
     val atwitter=managingCredentials()

   val ssc=StreamingContext.getOrCreate("hdfsDirectory",()=> {   managingContext() })
   fetchTweets(ssc, atwitter )

   ssc.start()             // Start the computation
   ssc.awaitTermination()

   }

   def managingContext():StreamingContext =
  {
   //making spark context
   val conf = new SparkConf().setMaster("local[*]").setAppName("twitterConnector")
   val ssc = new StreamingContext(conf, Seconds(1))
   val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
   import sqlContext.implicits._

   //checkpointing  
   ssc.checkpoint("hdfsDirectory")
   ssc
   }
    def fetchTweets (ssc : StreamingContext , atwitter : Option[twitter4j.auth.Authorization]) : Unit = {


   val tweets =TwitterUtils.createStream(ssc,atwitter,Nil,StorageLevel.MEMORY_AND_DISK_2)
   val twt = tweets.window(Seconds(10),Seconds(10))
  //checkpoint duration
  /twt.checkpoint(new Duration(1000))

   //processing
   case class Tweet(createdAt:Long, text:String)
   twt.map(status=>
   Tweet(status.getCreatedAt().getTime()/1000, status.getText())
   )
   twt.print()
  }

谁能在这方面帮助我?

您面临的问题应该与以下事实有关:您必须在 DStream 上管理您的转换和操作,应该在 managingContext 函数中执行。

因此,将 fetchTweets(ssc, atwitter ) 移到 managingContext 中应该可以解决问题。