如何在 Spark Streaming 应用程序的生命周期中使 SQLContext 实例保持活动状态?
How to keep a SQLContext instance alive in a spark streaming application's life cycle?
我在 spark streaming 应用程序中使用 SQLContext 作为 blew:
case class topic_name (f1: Int, f2: Int)
val sqlContext = new SQLContext(sc)
@transient val ssc = new StreamingContext(sc, new Duration(5 * 1000))
ssc.checkpoint(".")
val theDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set("topic_name"))
theDStream.map(x => x._2).foreach { rdd =>
sqlContext.jsonRDD(newsIdRDD).registerTempTable("topic_name")
sqlContext.sql("select count(*) from topic_name").foreach { x =>
WriteToFile("file_path", x(0).toString)
}
}
ssc.start()
ssc.awaitTermination()
我发现我只能每 5 秒获取一次消息计数,因为 "The lifetime of this temporary table is tied to the SQLContext that was used to create this DataFrame",我猜每 5 秒就会创建一个新的 sqlContext 而临时 table 只能存活 5秒,我想让 sqlContext 和临时 table 在流式应用程序的整个生命周期中存活,该怎么做?
谢谢~
你是对的。 SQLContext 只记住为该对象的生命周期注册的表。因此,与其使用 registerTempTable,不如使用 saveAsTable 命令使用像 Hive 这样的持久存储。
我在 spark streaming 应用程序中使用 SQLContext 作为 blew:
case class topic_name (f1: Int, f2: Int)
val sqlContext = new SQLContext(sc)
@transient val ssc = new StreamingContext(sc, new Duration(5 * 1000))
ssc.checkpoint(".")
val theDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set("topic_name"))
theDStream.map(x => x._2).foreach { rdd =>
sqlContext.jsonRDD(newsIdRDD).registerTempTable("topic_name")
sqlContext.sql("select count(*) from topic_name").foreach { x =>
WriteToFile("file_path", x(0).toString)
}
}
ssc.start()
ssc.awaitTermination()
我发现我只能每 5 秒获取一次消息计数,因为 "The lifetime of this temporary table is tied to the SQLContext that was used to create this DataFrame",我猜每 5 秒就会创建一个新的 sqlContext 而临时 table 只能存活 5秒,我想让 sqlContext 和临时 table 在流式应用程序的整个生命周期中存活,该怎么做?
谢谢~
你是对的。 SQLContext 只记住为该对象的生命周期注册的表。因此,与其使用 registerTempTable,不如使用 saveAsTable 命令使用像 Hive 这样的持久存储。