如何配置检查点以重新部署 Spark Streaming 应用程序?

How to config checkpoint to redeploy spark streaming application?

我正在使用 Spark 流式处理来计算唯一用户数。我使用 updateStateByKey,所以我需要配置一个检查点目录。我还在启动应用程序时从检查点加载数据,如 the example in the doc:

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

问题来了,如果我的代码改了,那我重新部署代码,不管代码改多少,checkpoint都会加载吗?或者我需要使用我自己的逻辑来持久化我的数据并在下一个 运行.

中加载它们

如果我使用自己的逻辑来保存和加载 DStream,那么如果应用程序失败重​​启,数据不会同时从检查点目录和我自己的数据库中加载吗?

检查点本身包括你的元数据、rdd、dag,甚至你的logic.If你改变你的逻辑并尝试从最后一个检查点运行它,你很可能会遇到异常。 如果您想使用自己的逻辑将数据保存在某处作为检查点,您可能需要实施一个 spark 操作以将检查点数据推送到任何数据库,在接下来的 运行 中,将检查点数据作为初始 RDD 加载(以防你使用 updateStateByKey API)并继续你的逻辑。

我在 Spark 邮件列表中问过这个问题并得到了答案,我已经在 my blog 上进行了分析。我将 post 总结在这里:

方法是同时使用检查点和我们自己的数据加载机制。但是我们将数据加载为 updateStateByKeyinitalRDD。所以在这两种情况下,数据既不会丢失也不会重复:

  1. 当我们更改代码并重新部署 Spark 应用程序时,我们优雅地关闭了旧的 Spark 应用程序并清理了检查点数据,因此唯一加载的数据是我们保存的数据。

  2. 当Spark应用程序失败并重新启动时,它会从检查点加载数据。但是 DAG 的步骤被省去了,所以它不会再次加载我们自己的数据作为 initalRDD。所以唯一加载的数据是检查点数据。