如何在 Spark Streaming 应用程序中初始化 DStream 的状态
How to Initialize the State of DStream in a spark streaming application
我有一个 spark-streaming 应用程序,基本上跟踪字符串-> 字符串字典。
所以我收到了更新消息,例如:
“A”->“B”
我需要更新字典。
这似乎是 updateStateByKey 方法的一个简单用例。
但是,我的问题是,当应用程序启动时,我需要使用来自配置单元 table 的数据“初始化”字典,其中包含字典的所有历史 key/values。
我能想到的唯一方法是做类似的事情:
val rdd =… //get data from hive
def process(input: DStream[(String, String)]) = {
input.join(rdd).updateStateByKey(update)
}
所以连接操作将在每个传入缓冲区上完成,实际上我只在初始化时需要它。
知道如何实现吗?
谢谢
PairDStreamFunctions.updateStateByKey
有一个 overload accepting an initialRDD 这似乎是你需要的:
updateStateByKey[S](updateFunc: (Seq[V], Option[S]) ⇒ Option[S], partitioner: Partitioner, initialRDD: RDD[(K, S)])(implicit arg0: ClassTag[S]): DStream[(K, S)]
我有一个 spark-streaming 应用程序,基本上跟踪字符串-> 字符串字典。
所以我收到了更新消息,例如:
“A”->“B”
我需要更新字典。
这似乎是 updateStateByKey 方法的一个简单用例。
但是,我的问题是,当应用程序启动时,我需要使用来自配置单元 table 的数据“初始化”字典,其中包含字典的所有历史 key/values。
我能想到的唯一方法是做类似的事情:
val rdd =… //get data from hive
def process(input: DStream[(String, String)]) = {
input.join(rdd).updateStateByKey(update)
}
所以连接操作将在每个传入缓冲区上完成,实际上我只在初始化时需要它。
知道如何实现吗?
谢谢
PairDStreamFunctions.updateStateByKey
有一个 overload accepting an initialRDD 这似乎是你需要的:
updateStateByKey[S](updateFunc: (Seq[V], Option[S]) ⇒ Option[S], partitioner: Partitioner, initialRDD: RDD[(K, S)])(implicit arg0: ClassTag[S]): DStream[(K, S)]