如何在 Spark Streaming 应用程序中初始化 DStream 的状态

How to Initialize the State of DStream in a spark streaming application

我有一个 spark-streaming 应用程序,基本上跟踪字符串-> 字符串字典。

所以我收到了更新消息,例如:

“A”->“B”

我需要更新字典。

这似乎是 updateStateByKey 方法的一个简单用例。

但是,我的问题是,当应用程序启动时,我需要使用来自配置单元 table 的数据“初始化”字典,其中包含字典的所有历史 key/values。

我能想到的唯一方法是做类似的事情:

val rdd =… //get data from hive
def process(input: DStream[(String, String)]) = {
    input.join(rdd).updateStateByKey(update)
}

所以连接操作将在每个传入缓冲区上完成,实际上我只在初始化时需要它。

知道如何实现吗?

谢谢

PairDStreamFunctions.updateStateByKey 有一个 overload accepting an initialRDD 这似乎是你需要的:

updateStateByKey[S](updateFunc: (Seq[V], Option[S]) ⇒ Option[S], partitioner: Partitioner, initialRDD: RDD[(K, S)])(implicit arg0: ClassTag[S]): DStream[(K, S)]