DStreams 的 Spark 流检查点

Spark streaming checkpoints for DStreams

在 Spark Streaming 中,可以(如果您要使用有状态操作,则必须这样做)设置 StreamingContext 以在可靠的数据存储(S3、HDFS 等)中执行检查点(和):

here所述,要设置输出数据存储,您需要调用yourSparkStreamingCtx.checkpoint(datastoreURL)

另一方面,可以通过调用 checkpoint(timeInterval) 为每个 DataStream 设置沿袭检查点间隔。事实上,建议将沿袭检查点间隔设置为 DataStream 滑动间隔的 5 到 10 倍:

dstream.checkpoint(checkpointInterval). Typically, a checkpoint interval of 5 - 10 sliding intervals of a DStream is a good setting to try.

我的问题是:

当流上下文已设置为执行检查点并且没有调用 ds.checkpoint(interval),是否为具有默认值 [=17= 的所有数据流启用沿袭检查点]等于batchInterval?或者相反,只有元数据检查点已启用?

检查 Spark 代码 (v1.5) 我发现 DStreams 的检查点在两种情况下启用:

通过显式调用他们的 checkpoint 方法(不是 StreamContext 的方法):

/**
* Enable periodic checkpointing of RDDs of this DStream
* @param interval Time interval after which generated RDD will be checkpointed
*/
def checkpoint(interval: Duration): DStream[T] = {
    if (isInitialized) {
        throw new UnsupportedOperationException(
            "Cannot change checkpoint interval of an DStream after streaming context has started")
    }
    persist()
    checkpointDuration = interval
    this
}

DStream 初始化时,只要具体的 'DStream' 子类重写了 mustCheckpoint 属性(将其设置为 true):

 private[streaming] def initialize(time: Time) {
  ...
  ...   
   // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
   if (mustCheckpoint && checkpointDuration == null) {
     checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
     logInfo("Checkpoint interval automatically set to " + checkpointDuration)
   }
  ...

第一种情况很明显。对 Spark Streaming 代码进行简单分析:

grep "val mustCheckpoint = true" $(find -type f -name "*.scala")

> ./org/apache/spark/streaming/api/python/PythonDStream.scala:  override     val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala:  override val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/StateDStream.scala:  override val mustCheckpoint = true

我发现,通常(忽略 PythonDStream),StreamingContext 检查点只为 StateDStreamReducedWindowedDStream 实例启用沿袭检查点。这些实例是转换的结果(分别为 AND):

  • updateStateByKey:即通过几个windows.
  • 提供状态的stream
  • reduceByKeyAndWindow