Spark Streaming Write Ahead Log 重启后不重放数据
Spark Streaming Write Ahead Log not replaying data after restart
为了有一种测试 Spark Streaming 预写日志的简单方法,我创建了一个非常简单的自定义输入接收器,它将生成字符串并存储它们:
class InMemoryStringReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) {
val batchID = System.currentTimeMillis()
def onStart() {
new Thread("InMemoryStringReceiver") {
override def run(): Unit = {
var i = 0
while(true) {
//http://spark.apache.org/docs/latest/streaming-custom-receivers.html
//To implement a reliable receiver, you have to use store(multiple-records) to store data.
store(ArrayBuffer(s"$batchID-$i"))
println(s"Stored => [$batchID-$i)]")
Thread.sleep(1000L)
i = i + 1
}
}
}.start()
}
def onStop() {}
}
然后我创建了一个简单的应用程序,它将使用自定义接收器来流式传输数据并进行处理:
object DStreamResilienceTest extends App {
val conf = new SparkConf().setMaster("local[*]").setAppName("DStreamResilienceTest").set("spark.streaming.receiver.writeAheadLog.enable", "true")
val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint("hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest")
val customReceiverStream: ReceiverInputDStream[String] = ssc.receiverStream(new InMemoryStringReceiver())
customReceiverStream.foreachRDD { (rdd: RDD[String]) =>
println(s"processed => [${rdd.collect().toList}]")
Thread.sleep(2000L)
}
ssc.start()
ssc.awaitTermination()
}
如您所见,每个接收到的 RDD 的处理都有 2 秒的睡眠,而字符串每秒存储一次。这会造成积压,新字符串会堆积起来,应该存储在 WAL 中。事实上,我可以看到检查点目录中的文件正在更新。 运行 应用程序我得到这样的输出:
[info] Stored => [1453374654941-0)]
[info] processed => [List(1453374654941-0)]
[info] Stored => [1453374654941-1)]
[info] Stored => [1453374654941-2)]
[info] processed => [List(1453374654941-1)]
[info] Stored => [1453374654941-3)]
[info] Stored => [1453374654941-4)]
[info] processed => [List(1453374654941-2)]
[info] Stored => [1453374654941-5)]
[info] Stored => [1453374654941-6)]
[info] processed => [List(1453374654941-3)]
[info] Stored => [1453374654941-7)]
[info] Stored => [1453374654941-8)]
[info] processed => [List(1453374654941-4)]
[info] Stored => [1453374654941-9)]
[info] Stored => [1453374654941-10)]
如您所料,存储速度超过了处理速度。所以我终止了应用程序并重新启动它。这次我把foreachRDD
中的sleep注释掉了,这样处理就可以清除任何积压的了:
[info] Stored => [1453374753946-0)]
[info] processed => [List(1453374753946-0)]
[info] Stored => [1453374753946-1)]
[info] processed => [List(1453374753946-1)]
[info] Stored => [1453374753946-2)]
[info] processed => [List(1453374753946-2)]
[info] Stored => [1453374753946-3)]
[info] processed => [List(1453374753946-3)]
[info] Stored => [1453374753946-4)]
[info] processed => [List(1453374753946-4)]
如您所见,新事件已处理,但 none 来自上一批。旧的 WAL 日志已清除,我看到这样的日志消息,但旧数据未得到处理。
INFO WriteAheadLogManager : Recovered 1 write ahead log files from hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest/receivedData/0
我做错了什么?我正在使用 Spark 1.5.2。
朱世雄 (Ryan) 在 Spark Users mailing list 上回答了这个问题。
按照他的建议使用 StreamingContext.getOrCreate
。
为了有一种测试 Spark Streaming 预写日志的简单方法,我创建了一个非常简单的自定义输入接收器,它将生成字符串并存储它们:
class InMemoryStringReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) {
val batchID = System.currentTimeMillis()
def onStart() {
new Thread("InMemoryStringReceiver") {
override def run(): Unit = {
var i = 0
while(true) {
//http://spark.apache.org/docs/latest/streaming-custom-receivers.html
//To implement a reliable receiver, you have to use store(multiple-records) to store data.
store(ArrayBuffer(s"$batchID-$i"))
println(s"Stored => [$batchID-$i)]")
Thread.sleep(1000L)
i = i + 1
}
}
}.start()
}
def onStop() {}
}
然后我创建了一个简单的应用程序,它将使用自定义接收器来流式传输数据并进行处理:
object DStreamResilienceTest extends App {
val conf = new SparkConf().setMaster("local[*]").setAppName("DStreamResilienceTest").set("spark.streaming.receiver.writeAheadLog.enable", "true")
val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint("hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest")
val customReceiverStream: ReceiverInputDStream[String] = ssc.receiverStream(new InMemoryStringReceiver())
customReceiverStream.foreachRDD { (rdd: RDD[String]) =>
println(s"processed => [${rdd.collect().toList}]")
Thread.sleep(2000L)
}
ssc.start()
ssc.awaitTermination()
}
如您所见,每个接收到的 RDD 的处理都有 2 秒的睡眠,而字符串每秒存储一次。这会造成积压,新字符串会堆积起来,应该存储在 WAL 中。事实上,我可以看到检查点目录中的文件正在更新。 运行 应用程序我得到这样的输出:
[info] Stored => [1453374654941-0)]
[info] processed => [List(1453374654941-0)]
[info] Stored => [1453374654941-1)]
[info] Stored => [1453374654941-2)]
[info] processed => [List(1453374654941-1)]
[info] Stored => [1453374654941-3)]
[info] Stored => [1453374654941-4)]
[info] processed => [List(1453374654941-2)]
[info] Stored => [1453374654941-5)]
[info] Stored => [1453374654941-6)]
[info] processed => [List(1453374654941-3)]
[info] Stored => [1453374654941-7)]
[info] Stored => [1453374654941-8)]
[info] processed => [List(1453374654941-4)]
[info] Stored => [1453374654941-9)]
[info] Stored => [1453374654941-10)]
如您所料,存储速度超过了处理速度。所以我终止了应用程序并重新启动它。这次我把foreachRDD
中的sleep注释掉了,这样处理就可以清除任何积压的了:
[info] Stored => [1453374753946-0)]
[info] processed => [List(1453374753946-0)]
[info] Stored => [1453374753946-1)]
[info] processed => [List(1453374753946-1)]
[info] Stored => [1453374753946-2)]
[info] processed => [List(1453374753946-2)]
[info] Stored => [1453374753946-3)]
[info] processed => [List(1453374753946-3)]
[info] Stored => [1453374753946-4)]
[info] processed => [List(1453374753946-4)]
如您所见,新事件已处理,但 none 来自上一批。旧的 WAL 日志已清除,我看到这样的日志消息,但旧数据未得到处理。
INFO WriteAheadLogManager : Recovered 1 write ahead log files from hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest/receivedData/0
我做错了什么?我正在使用 Spark 1.5.2。
朱世雄 (Ryan) 在 Spark Users mailing list 上回答了这个问题。
按照他的建议使用 StreamingContext.getOrCreate
。