Spark Kafka Streaming 取得进展但没有数据可消费
Spark Kafka Streaming making progress but there is no data to be consumed
我有一个简单的 Spark 结构化流作业,它使用 Kafka 0.10 API 从 Kafka 读取数据并写入我们的 S3 存储。从日志中我可以看到,对于触发的每个批次,流应用程序都在取得进展并正在使用来自源的数据,因为 endOffset 大于 startOffset 并且每批次都在增加。但是 numInputRows 始终为零,并且没有行写入 S3。
为什么偏移量逐渐增加但火花批处理没有消耗数据?
19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made progress: {
"id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
"runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
"name" : null,
"timestamp" : "2019-09-10T15:55:00.000Z",
"batchId" : 189,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"addBatch" : 127,
"getBatch" : 0,
"getEndOffset" : 0,
"queryPlanning" : 24,
"setOffsetRange" : 36,
"triggerExecution" : 1859,
"walCommit" : 1032
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[my_kafka_topic]]",
"startOffset" : {
"my_kafka_topic" : {
"23" : 1206926686,
"8" : 1158514946,
"17" : 1258387219,
"11" : 1263091642,
"2" : 1226741128,
"20" : 1229560889,
"5" : 1170304913,
"14" : 1207333901,
"4" : 1274242728,
"13" : 1336386658,
"22" : 1260210993,
"7" : 1288639296,
"16" : 1247462229,
"10" : 1093157103,
"1" : 1219904858,
"19" : 1116269615,
"9" : 1238935018,
"18" : 1069224544,
"12" : 1256018541,
"3" : 1251150202,
"21" : 1256774117,
"15" : 1170591375,
"6" : 1185108169,
"24" : 1202342095,
"0" : 1165356330
}
},
"endOffset" : {
"my_kafka_topic" : {
"23" : 1206928043,
"8" : 1158516721,
"17" : 1258389219,
"11" : 1263093490,
"2" : 1226743225,
"20" : 1229562962,
"5" : 1170307882,
"14" : 1207335736,
"4" : 1274245585,
"13" : 1336388570,
"22" : 1260213582,
"7" : 1288641384,
"16" : 1247464311,
"10" : 1093159186,
"1" : 1219906407,
"19" : 1116271435,
"9" : 1238936994,
"18" : 1069226913,
"12" : 1256020926,
"3" : 1251152579,
"21" : 1256776910,
"15" : 1170593216,
"6" : 1185110032,
"24" : 1202344538,
"0" : 1165358262
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "FileSink[s3://my-s3-bucket/data/kafka/my_kafka_topic]"
}
}
简化版的spark代码如下图
val df = sparkSession
.readStream
.format"kafka")
.options(Map(
"kafka.bootstrap.servers" -> "host:1009",
"subscribe" -> "my_kafka-topic",
"kafka.client.id" -> "my-client-id",
"maxOffsetsPerTrigger" -> 1000,
"fetch.message.max.bytes" -> 6048576
))
.load()
df
.writeStream
.partitionBy("date", "hour")
.outputMode(OutputMode.Append())
.format("parquet")
.options(Map("checkpointLocation" -> "checkpoint", "path" -> "data"))
.trigger(Trigger.ProcessingTime(Duration("5m")))
.start()
.awaitTermination()
编辑:在执行每个批处理之前,我还从日志中看到了这些
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting offset for partition my-topic-5 to offset 1168959116.
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting offset for partition my-topic-1 to offset 1218619371.
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting offset for partition my-topic-8 to offset 1157205346.
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting offset for partition my-topic-21 to offset 1255403059.
您能否检查以下link中提到的与输出目录和检查点位置相关的案例是否适用于您的案例?
https://kb.databricks.com/streaming/file-sink-streaming.html
当我清理我的检查点位置以重新开始流式传输但使用旧目标位置(未清除)写入流式数据时,我发生了更新偏移量但没有输入行的确切问题。清理(更改)检查点和写入位置后,它工作得很好。
在这种特殊情况下,当我清除检查点位置时,偏移量得到了正确更新。但是因为我没有清除目标位置(因为它有来自 5-6 个月的连续流式传输的数据,即要删除的 1000 个小文件中的 100 个)但显然 spark 检查了 spark 元数据并且因为它在其中找到了旧数据没有消耗任何新数据。
我有一个简单的 Spark 结构化流作业,它使用 Kafka 0.10 API 从 Kafka 读取数据并写入我们的 S3 存储。从日志中我可以看到,对于触发的每个批次,流应用程序都在取得进展并正在使用来自源的数据,因为 endOffset 大于 startOffset 并且每批次都在增加。但是 numInputRows 始终为零,并且没有行写入 S3。
为什么偏移量逐渐增加但火花批处理没有消耗数据?
19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made progress: {
"id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
"runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
"name" : null,
"timestamp" : "2019-09-10T15:55:00.000Z",
"batchId" : 189,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"addBatch" : 127,
"getBatch" : 0,
"getEndOffset" : 0,
"queryPlanning" : 24,
"setOffsetRange" : 36,
"triggerExecution" : 1859,
"walCommit" : 1032
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[my_kafka_topic]]",
"startOffset" : {
"my_kafka_topic" : {
"23" : 1206926686,
"8" : 1158514946,
"17" : 1258387219,
"11" : 1263091642,
"2" : 1226741128,
"20" : 1229560889,
"5" : 1170304913,
"14" : 1207333901,
"4" : 1274242728,
"13" : 1336386658,
"22" : 1260210993,
"7" : 1288639296,
"16" : 1247462229,
"10" : 1093157103,
"1" : 1219904858,
"19" : 1116269615,
"9" : 1238935018,
"18" : 1069224544,
"12" : 1256018541,
"3" : 1251150202,
"21" : 1256774117,
"15" : 1170591375,
"6" : 1185108169,
"24" : 1202342095,
"0" : 1165356330
}
},
"endOffset" : {
"my_kafka_topic" : {
"23" : 1206928043,
"8" : 1158516721,
"17" : 1258389219,
"11" : 1263093490,
"2" : 1226743225,
"20" : 1229562962,
"5" : 1170307882,
"14" : 1207335736,
"4" : 1274245585,
"13" : 1336388570,
"22" : 1260213582,
"7" : 1288641384,
"16" : 1247464311,
"10" : 1093159186,
"1" : 1219906407,
"19" : 1116271435,
"9" : 1238936994,
"18" : 1069226913,
"12" : 1256020926,
"3" : 1251152579,
"21" : 1256776910,
"15" : 1170593216,
"6" : 1185110032,
"24" : 1202344538,
"0" : 1165358262
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "FileSink[s3://my-s3-bucket/data/kafka/my_kafka_topic]"
}
}
简化版的spark代码如下图
val df = sparkSession
.readStream
.format"kafka")
.options(Map(
"kafka.bootstrap.servers" -> "host:1009",
"subscribe" -> "my_kafka-topic",
"kafka.client.id" -> "my-client-id",
"maxOffsetsPerTrigger" -> 1000,
"fetch.message.max.bytes" -> 6048576
))
.load()
df
.writeStream
.partitionBy("date", "hour")
.outputMode(OutputMode.Append())
.format("parquet")
.options(Map("checkpointLocation" -> "checkpoint", "path" -> "data"))
.trigger(Trigger.ProcessingTime(Duration("5m")))
.start()
.awaitTermination()
编辑:在执行每个批处理之前,我还从日志中看到了这些
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting offset for partition my-topic-5 to offset 1168959116.
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting offset for partition my-topic-1 to offset 1218619371.
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting offset for partition my-topic-8 to offset 1157205346.
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting offset for partition my-topic-21 to offset 1255403059.
您能否检查以下link中提到的与输出目录和检查点位置相关的案例是否适用于您的案例?
https://kb.databricks.com/streaming/file-sink-streaming.html
当我清理我的检查点位置以重新开始流式传输但使用旧目标位置(未清除)写入流式数据时,我发生了更新偏移量但没有输入行的确切问题。清理(更改)检查点和写入位置后,它工作得很好。
在这种特殊情况下,当我清除检查点位置时,偏移量得到了正确更新。但是因为我没有清除目标位置(因为它有来自 5-6 个月的连续流式传输的数据,即要删除的 1000 个小文件中的 100 个)但显然 spark 检查了 spark 元数据并且因为它在其中找到了旧数据没有消耗任何新数据。