为什么从检查点恢复时 Spark 会抛出 "SparkException: DStream has not been initialized"?
Why does Spark throw "SparkException: DStream has not been initialized" when restoring from checkpoint?
我正在从 HDFS 检查点(例如 ConstantInputDSTream)恢复流,但我一直收到 SparkException: <X> has not been initialized
。
从检查点恢复时,我需要做什么具体的事情吗?
我可以看到它想要设置 DStream.zeroTime
,但是恢复流时 zeroTime
是 null
。由于它是私有成员 IDK,因此可能无法恢复。我可以看到恢复流引用的 StreamingContext
确实具有 zeroTime
.
的值
initialize
是一个私有方法,在 StreamingContext.graph.start
处被调用但不被 StreamingContext.graph.restart
调用,大概是因为它期望 zeroTime
被持久化。
有人有从检查点恢复并具有 zeroTime
的非空值的 Stream 示例吗?
def createStreamingContext(): StreamingContext = {
val ssc = new StreamingContext(sparkConf, Duration(1000))
ssc.checkpoint(checkpointDir)
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir), createStreamingContext)
val socketStream = ssc.socketTextStream(...)
socketStream.checkpoint(Seconds(1))
socketStream.foreachRDD(...)
问题是我在从检查点重新创建 StreamingContext 之后创建了 dstream,即在 StreamingContext.getOrCreate
之后。创建 dstream 和所有转换应该在 createStreamingContext
.
中
当您尝试对 2 个不同的 spark 流作业使用相同的检查点目录时,也可能会发生此异常。在那种情况下,你也会得到这个例外。
尝试为每个 spark 作业使用唯一的检查点目录。
错误 StreamingContext:启动上下文时出错,将其标记为已停止
org.apache.spark.SparkException: org.apache.spark.streaming.dstream.FlatMappedDStream@6c17c0f8 还没有初始化
在 org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:313)
在 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
在 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
在 scala.Option.orElse(Option.scala:289)
上述错误是由于我还有另一个 Spark 作业写入同一个 checkpointdir。尽管另一个 Spark 作业不是 运行,但它已写入 checkpointdir,新的 Spark 作业无法配置 StreamingContext。
我删除了checkpointdir的内容,重新提交了Spark Job,问题就解决了。
或者,您可以为每个 Spark 作业使用单独的检查点目录,以保持简单。
我正在从 HDFS 检查点(例如 ConstantInputDSTream)恢复流,但我一直收到 SparkException: <X> has not been initialized
。
从检查点恢复时,我需要做什么具体的事情吗?
我可以看到它想要设置 DStream.zeroTime
,但是恢复流时 zeroTime
是 null
。由于它是私有成员 IDK,因此可能无法恢复。我可以看到恢复流引用的 StreamingContext
确实具有 zeroTime
.
initialize
是一个私有方法,在 StreamingContext.graph.start
处被调用但不被 StreamingContext.graph.restart
调用,大概是因为它期望 zeroTime
被持久化。
有人有从检查点恢复并具有 zeroTime
的非空值的 Stream 示例吗?
def createStreamingContext(): StreamingContext = {
val ssc = new StreamingContext(sparkConf, Duration(1000))
ssc.checkpoint(checkpointDir)
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir), createStreamingContext)
val socketStream = ssc.socketTextStream(...)
socketStream.checkpoint(Seconds(1))
socketStream.foreachRDD(...)
问题是我在从检查点重新创建 StreamingContext 之后创建了 dstream,即在 StreamingContext.getOrCreate
之后。创建 dstream 和所有转换应该在 createStreamingContext
.
当您尝试对 2 个不同的 spark 流作业使用相同的检查点目录时,也可能会发生此异常。在那种情况下,你也会得到这个例外。
尝试为每个 spark 作业使用唯一的检查点目录。
错误 StreamingContext:启动上下文时出错,将其标记为已停止 org.apache.spark.SparkException: org.apache.spark.streaming.dstream.FlatMappedDStream@6c17c0f8 还没有初始化 在 org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:313) 在 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) 在 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) 在 scala.Option.orElse(Option.scala:289)
上述错误是由于我还有另一个 Spark 作业写入同一个 checkpointdir。尽管另一个 Spark 作业不是 运行,但它已写入 checkpointdir,新的 Spark 作业无法配置 StreamingContext。
我删除了checkpointdir的内容,重新提交了Spark Job,问题就解决了。
或者,您可以为每个 Spark 作业使用单独的检查点目录,以保持简单。