Spark streaming 不记得以前的状态
Spark streaming not remembering previous state
我编写了带有状态转换的 spark 流程序。
似乎我的火花流应用程序正在使用检查点正确地进行计算。
但是如果我终止我的程序并再次启动它,它不会读取以前的检查点数据并从头开始。这是预期的行为吗?
我是否需要更改我的程序中的任何内容,以便它记住以前的数据并从那里开始计算?
提前致谢。
参考我的程序:
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
val inputStream = ssc.socketTextStream(<hostname>, 9999)
ssc.checkpoint("hdfs://<hostname1>:8020/user/spark/checkpoints_dir")
inputStream.print(1)
val parsedStream = inputStream
.map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong)))
})
import breeze.linalg.{DenseVector => BDV}
import scala.util.Try
val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
(current: Seq[Array[Long]], prev: Option[Array[Long]]) => {
prev.map(_ +: current).orElse(Some(current))
.flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
})
state.checkpoint(Duration(10000))
state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
// Start the computation
ssc.start()
// Wait for the computation to terminate
ssc.awaitTermination()
}
}
根据 spark-streaming 文档,您应该以不同的方式初始化上下文:
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
如 checkpointing documentation 中所述,您必须调整代码才能从检查点恢复状态。
特别是您不能直接创建 StreamingContext
,而必须使用 StreamingContext.getOrCreate
方法,该方法需要:
- 检查点目录
- 可用于设置上下文的函数(
Unit => StreamingContext
)
我编写了带有状态转换的 spark 流程序。 似乎我的火花流应用程序正在使用检查点正确地进行计算。 但是如果我终止我的程序并再次启动它,它不会读取以前的检查点数据并从头开始。这是预期的行为吗?
我是否需要更改我的程序中的任何内容,以便它记住以前的数据并从那里开始计算?
提前致谢。
参考我的程序:
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
val inputStream = ssc.socketTextStream(<hostname>, 9999)
ssc.checkpoint("hdfs://<hostname1>:8020/user/spark/checkpoints_dir")
inputStream.print(1)
val parsedStream = inputStream
.map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong)))
})
import breeze.linalg.{DenseVector => BDV}
import scala.util.Try
val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
(current: Seq[Array[Long]], prev: Option[Array[Long]]) => {
prev.map(_ +: current).orElse(Some(current))
.flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
})
state.checkpoint(Duration(10000))
state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
// Start the computation
ssc.start()
// Wait for the computation to terminate
ssc.awaitTermination()
}
}
根据 spark-streaming 文档,您应该以不同的方式初始化上下文:
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
如 checkpointing documentation 中所述,您必须调整代码才能从检查点恢复状态。
特别是您不能直接创建 StreamingContext
,而必须使用 StreamingContext.getOrCreate
方法,该方法需要:
- 检查点目录
- 可用于设置上下文的函数(
Unit => StreamingContext
)