来自 kafka 的 Spark 结构化流 - 从检查点恢复后再次处理最后一条消息
Spark structured steaming from kafka - last message processed again after resume from checkpoint
我正在使用全新的(并标记为 "alpha")Spark 2.0.2 的结构化流从 kafka 主题读取消息并从中更新几个 cassandra 表:
val readStream = sparkSession.readStream
.format("kafka")
.option("subscribe", "maxwell")
.option("kafka.bootstrap.servers", "localhost:9092")
.load
.as[KafkaMessage]
.map(<transform KafkaMessage to Company>)
val writeStream = readStream
.writeStream
.queryName("CompanyUpdatesInCassandra")
.foreach(new ForeachWriter[Company] {
def open(partitionId: Long, version: Long): Boolean = {
true
}
def process(company: Company): Unit = {
...
}
def close(errorOrNull: Throwable): Unit = {}
}
.start
.awaitTermination
我还在 sparkSession 上配置了一个检查点位置 ("spark.sql.streaming.checkpointLocation")。这使我能够在流媒体应用程序恢复时立即接收到的消息。
但是,自配置此检查点位置以来,我注意到在恢复时它也始终如一地处理前一批的最后一条消息,即使它已经正确处理而没有失败。
知道我做错了什么吗?这似乎是一个非常常见的用例。
更多信息:
查看相关日志(主题5876是上一批成功处理的最后一个主题):
[INFO] 12:44:02.294 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Resuming streaming query, starting with batch 31
[DEBUG] 12:44:02.297 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Found possibly uncommitted offsets {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5877)]}
[DEBUG] 12:44:02.300 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Resuming with committed offsets: {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5876)]}
[DEBUG] 12:44:02.301 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Stream running from {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5876)]} to {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5877)]}
[INFO] 12:44:02.310 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: GetBatch called with start = Some([(maxwell-0,5876)]), end = [(maxwell-0,5877)]
[INFO] 12:44:02.311 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Partitions added: Map()
[DEBUG] 12:44:02.313 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: TopicPartitions: maxwell-0
[DEBUG] 12:44:02.318 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Sorted executors:
[INFO] 12:44:02.415 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: GetBatch generating RDD of offset range: KafkaSourceRDDOffsetRange(maxwell-0,5876,5877,None)
[DEBUG] 12:44:02.467 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Retrieving data from KafkaSource[Subscribe[maxwell]]: Some([(maxwell-0,5876)]) -> [(maxwell-0,5877)]
[DEBUG] 12:44:09.242 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Creating iterator for KafkaSourceRDDOffsetRange(maxwell-0,5876,5877,None)
[INFO] 12:44:09.879 [Executor task launch worker-0] biz.meetmatch.streaming.CompanyUpdateListener$$anon: open (partitionId:0, version:31)
[DEBUG] 12:44:09.880 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Get spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor maxwell-0 nextOffset -2 requested 5876
[INFO] 12:44:09.881 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Initial fetch for maxwell-0 5876
[DEBUG] 12:44:09.881 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Seeking to spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor maxwell-0 5876
[DEBUG] 12:44:10.049 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Polled spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor [maxwell-0] 1
此外,当我终止流时,我确保它正常停止以避免数据丢失:
sys.ShutdownHookThread
{
writeStream.stop
sparkSession.stop
}
目前,Structured Streaming 会在生成新偏移量时检查状态。所以你描述的情况是意料之中的,最后一个committed的batch在recovery之后可能会被重新处理。但是,这是一个内部实现。比方说,如果我们在提交批处理时做检查点,检查点仍然有可能失败,你的接收器 ForeachWriter 也需要处理这种情况。
通常,您的接收器应始终是幂等的。
更新:在 Spark 2.2.0 中,Structured Streaming 在恢复成功后不会重新运行批处理。
我正在使用全新的(并标记为 "alpha")Spark 2.0.2 的结构化流从 kafka 主题读取消息并从中更新几个 cassandra 表:
val readStream = sparkSession.readStream
.format("kafka")
.option("subscribe", "maxwell")
.option("kafka.bootstrap.servers", "localhost:9092")
.load
.as[KafkaMessage]
.map(<transform KafkaMessage to Company>)
val writeStream = readStream
.writeStream
.queryName("CompanyUpdatesInCassandra")
.foreach(new ForeachWriter[Company] {
def open(partitionId: Long, version: Long): Boolean = {
true
}
def process(company: Company): Unit = {
...
}
def close(errorOrNull: Throwable): Unit = {}
}
.start
.awaitTermination
我还在 sparkSession 上配置了一个检查点位置 ("spark.sql.streaming.checkpointLocation")。这使我能够在流媒体应用程序恢复时立即接收到的消息。
但是,自配置此检查点位置以来,我注意到在恢复时它也始终如一地处理前一批的最后一条消息,即使它已经正确处理而没有失败。
知道我做错了什么吗?这似乎是一个非常常见的用例。
更多信息:
查看相关日志(主题5876是上一批成功处理的最后一个主题):
[INFO] 12:44:02.294 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Resuming streaming query, starting with batch 31
[DEBUG] 12:44:02.297 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Found possibly uncommitted offsets {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5877)]}
[DEBUG] 12:44:02.300 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Resuming with committed offsets: {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5876)]}
[DEBUG] 12:44:02.301 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Stream running from {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5876)]} to {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5877)]}
[INFO] 12:44:02.310 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: GetBatch called with start = Some([(maxwell-0,5876)]), end = [(maxwell-0,5877)]
[INFO] 12:44:02.311 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Partitions added: Map()
[DEBUG] 12:44:02.313 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: TopicPartitions: maxwell-0
[DEBUG] 12:44:02.318 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Sorted executors:
[INFO] 12:44:02.415 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: GetBatch generating RDD of offset range: KafkaSourceRDDOffsetRange(maxwell-0,5876,5877,None)
[DEBUG] 12:44:02.467 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Retrieving data from KafkaSource[Subscribe[maxwell]]: Some([(maxwell-0,5876)]) -> [(maxwell-0,5877)]
[DEBUG] 12:44:09.242 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Creating iterator for KafkaSourceRDDOffsetRange(maxwell-0,5876,5877,None)
[INFO] 12:44:09.879 [Executor task launch worker-0] biz.meetmatch.streaming.CompanyUpdateListener$$anon: open (partitionId:0, version:31)
[DEBUG] 12:44:09.880 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Get spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor maxwell-0 nextOffset -2 requested 5876
[INFO] 12:44:09.881 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Initial fetch for maxwell-0 5876
[DEBUG] 12:44:09.881 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Seeking to spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor maxwell-0 5876
[DEBUG] 12:44:10.049 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Polled spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor [maxwell-0] 1
此外,当我终止流时,我确保它正常停止以避免数据丢失:
sys.ShutdownHookThread
{
writeStream.stop
sparkSession.stop
}
目前,Structured Streaming 会在生成新偏移量时检查状态。所以你描述的情况是意料之中的,最后一个committed的batch在recovery之后可能会被重新处理。但是,这是一个内部实现。比方说,如果我们在提交批处理时做检查点,检查点仍然有可能失败,你的接收器 ForeachWriter 也需要处理这种情况。
通常,您的接收器应始终是幂等的。
更新:在 Spark 2.2.0 中,Structured Streaming 在恢复成功后不会重新运行批处理。