DStreams 的 Spark 流检查点
Spark streaming checkpoints for DStreams
在 Spark Streaming 中,可以(如果您要使用有状态操作,则必须这样做)设置 StreamingContext
以在可靠的数据存储(S3、HDFS 等)中执行检查点(和):
- 元数据
DStream
血统
如here所述,要设置输出数据存储,您需要调用yourSparkStreamingCtx.checkpoint(datastoreURL)
另一方面,可以通过调用 checkpoint(timeInterval)
为每个 DataStream
设置沿袭检查点间隔。事实上,建议将沿袭检查点间隔设置为 DataStream
滑动间隔的 5 到 10 倍:
dstream.checkpoint(checkpointInterval). Typically, a checkpoint
interval of 5 - 10 sliding intervals of a DStream is a good setting to
try.
我的问题是:
当流上下文已设置为执行检查点并且没有调用 ds.checkpoint(interval)
时,是否为具有默认值 [=17= 的所有数据流启用沿袭检查点]等于batchInterval
?或者相反,只有元数据检查点已启用?
检查 Spark 代码 (v1.5) 我发现 DStream
s 的检查点在两种情况下启用:
通过显式调用他们的 checkpoint
方法(不是 StreamContext
的方法):
/**
* Enable periodic checkpointing of RDDs of this DStream
* @param interval Time interval after which generated RDD will be checkpointed
*/
def checkpoint(interval: Duration): DStream[T] = {
if (isInitialized) {
throw new UnsupportedOperationException(
"Cannot change checkpoint interval of an DStream after streaming context has started")
}
persist()
checkpointDuration = interval
this
}
在 DStream
初始化时,只要具体的 'DStream' 子类重写了 mustCheckpoint
属性(将其设置为 true
):
private[streaming] def initialize(time: Time) {
...
...
// Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
if (mustCheckpoint && checkpointDuration == null) {
checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
logInfo("Checkpoint interval automatically set to " + checkpointDuration)
}
...
第一种情况很明显。对 Spark Streaming 代码进行简单分析:
grep "val mustCheckpoint = true" $(find -type f -name "*.scala")
> ./org/apache/spark/streaming/api/python/PythonDStream.scala: override val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala: override val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/StateDStream.scala: override val mustCheckpoint = true
我发现,通常(忽略 PythonDStream
),StreamingContext
检查点只为 StateDStream
和 ReducedWindowedDStream
实例启用沿袭检查点。这些实例是转换的结果(分别为 AND):
- updateStateByKey:即通过几个windows.
提供状态的stream
- reduceByKeyAndWindow
在 Spark Streaming 中,可以(如果您要使用有状态操作,则必须这样做)设置 StreamingContext
以在可靠的数据存储(S3、HDFS 等)中执行检查点(和):
- 元数据
DStream
血统
如here所述,要设置输出数据存储,您需要调用yourSparkStreamingCtx.checkpoint(datastoreURL)
另一方面,可以通过调用 checkpoint(timeInterval)
为每个 DataStream
设置沿袭检查点间隔。事实上,建议将沿袭检查点间隔设置为 DataStream
滑动间隔的 5 到 10 倍:
dstream.checkpoint(checkpointInterval). Typically, a checkpoint interval of 5 - 10 sliding intervals of a DStream is a good setting to try.
我的问题是:
当流上下文已设置为执行检查点并且没有调用 ds.checkpoint(interval)
时,是否为具有默认值 [=17= 的所有数据流启用沿袭检查点]等于batchInterval
?或者相反,只有元数据检查点已启用?
检查 Spark 代码 (v1.5) 我发现 DStream
s 的检查点在两种情况下启用:
通过显式调用他们的 checkpoint
方法(不是 StreamContext
的方法):
/**
* Enable periodic checkpointing of RDDs of this DStream
* @param interval Time interval after which generated RDD will be checkpointed
*/
def checkpoint(interval: Duration): DStream[T] = {
if (isInitialized) {
throw new UnsupportedOperationException(
"Cannot change checkpoint interval of an DStream after streaming context has started")
}
persist()
checkpointDuration = interval
this
}
在 DStream
初始化时,只要具体的 'DStream' 子类重写了 mustCheckpoint
属性(将其设置为 true
):
private[streaming] def initialize(time: Time) {
...
...
// Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
if (mustCheckpoint && checkpointDuration == null) {
checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
logInfo("Checkpoint interval automatically set to " + checkpointDuration)
}
...
第一种情况很明显。对 Spark Streaming 代码进行简单分析:
grep "val mustCheckpoint = true" $(find -type f -name "*.scala")
> ./org/apache/spark/streaming/api/python/PythonDStream.scala: override val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala: override val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/StateDStream.scala: override val mustCheckpoint = true
我发现,通常(忽略 PythonDStream
),StreamingContext
检查点只为 StateDStream
和 ReducedWindowedDStream
实例启用沿袭检查点。这些实例是转换的结果(分别为 AND):
- updateStateByKey:即通过几个windows. 提供状态的stream
- reduceByKeyAndWindow