如何在 Spark Streaming 应用程序的生命周期中使 SQLContext 实例保持活动状态?

How to keep a SQLContext instance alive in a spark streaming application's life cycle?

我在 spark streaming 应用程序中使用 SQLContext 作为 blew:

case class topic_name (f1: Int, f2: Int)

val sqlContext = new SQLContext(sc)
@transient val ssc = new StreamingContext(sc, new Duration(5 * 1000))
ssc.checkpoint(".")
val theDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set("topic_name"))

theDStream.map(x => x._2).foreach { rdd =>
  sqlContext.jsonRDD(newsIdRDD).registerTempTable("topic_name")
  sqlContext.sql("select count(*) from topic_name").foreach { x => 
    WriteToFile("file_path", x(0).toString)
  }
}

ssc.start()
ssc.awaitTermination()

我发现我只能每 5 秒获取一次消息计数,因为 "The lifetime of this temporary table is tied to the SQLContext that was used to create this DataFrame",我猜每 5 秒就会创建一个新的 sqlContext 而临时 table 只能存活 5秒,我想让 sqlContext 和临时 table 在流式应用程序的整个生命周期中存活,该怎么做?

谢谢~

你是对的。 SQLContext 只记住为该对象的生命周期注册的表。因此,与其使用 registerTempTable,不如使用 saveAsTable 命令使用像 Hive 这样的持久存储。