如何使用 spark 结构化流管理从 kafka 读取的偏移量
how to manage offset read from kafka with spark structured stream
我有一个 spark 结构化流作业,需要从 kafka 主题读取数据并进行一些聚合。该作业需要每天重新启动,但是当它重新启动时,如果我设置 startingOffsets="latest"
,我将丢失重新启动时间之间的数据。如果我设置 startingOffsets="earliest"
那么作业将从主题中读取所有数据,而不是从最后一个流作业离开的地方读取。任何人都可以帮助我如何配置以在最后一个流媒体作业离开的地方设置偏移量吗?
我正在使用 Spark 2.4.0 和 kafka 2.1.1,我尝试为写入作业设置检查点位置但似乎 Spark 不检查 kafka 消息的偏移量,因此它会根据 startingOffsets 不断检查最后一个偏移量或第一个偏移量。
这是我的 spark 从 kafka 读取的配置:
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", host)
.option("subscribe", topic)
.option("startingOffsets", offset)
.option("enable.auto.commit", "false")
.load()
例如,kafka 主题有 10 条消息,偏移量从 1 到 10,spark 刚刚处理完消息 5,然后重新启动。如何让 spark 继续阅读消息 5 而不是消息 1 或 11?
似乎使用一些代码我可以获取我需要的偏移量并将其保存到一些可靠的存储中,比如 cassandra。然后当火花流开始时,我只需要读取保存的偏移量并将其填充到 startingOffsets。
这是帮助我获得所需偏移量的代码
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener._
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
println("Query started:" + queryStarted.id)
}
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
println("Query terminated" + queryTerminated.id)
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
println("Query made progress")
println("Starting offset:" + queryProgress.progress.sources(0).startOffset)
println("Ending offset:" + queryProgress.progress.sources(0).endOffset)
//Logic to save these offsets
// the logic to save the offset write in here
}
})
我有一个 spark 结构化流作业,需要从 kafka 主题读取数据并进行一些聚合。该作业需要每天重新启动,但是当它重新启动时,如果我设置 startingOffsets="latest"
,我将丢失重新启动时间之间的数据。如果我设置 startingOffsets="earliest"
那么作业将从主题中读取所有数据,而不是从最后一个流作业离开的地方读取。任何人都可以帮助我如何配置以在最后一个流媒体作业离开的地方设置偏移量吗?
我正在使用 Spark 2.4.0 和 kafka 2.1.1,我尝试为写入作业设置检查点位置但似乎 Spark 不检查 kafka 消息的偏移量,因此它会根据 startingOffsets 不断检查最后一个偏移量或第一个偏移量。
这是我的 spark 从 kafka 读取的配置:
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", host)
.option("subscribe", topic)
.option("startingOffsets", offset)
.option("enable.auto.commit", "false")
.load()
例如,kafka 主题有 10 条消息,偏移量从 1 到 10,spark 刚刚处理完消息 5,然后重新启动。如何让 spark 继续阅读消息 5 而不是消息 1 或 11?
似乎使用一些代码我可以获取我需要的偏移量并将其保存到一些可靠的存储中,比如 cassandra。然后当火花流开始时,我只需要读取保存的偏移量并将其填充到 startingOffsets。 这是帮助我获得所需偏移量的代码
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener._
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
println("Query started:" + queryStarted.id)
}
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
println("Query terminated" + queryTerminated.id)
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
println("Query made progress")
println("Starting offset:" + queryProgress.progress.sources(0).startOffset)
println("Ending offset:" + queryProgress.progress.sources(0).endOffset)
//Logic to save these offsets
// the logic to save the offset write in here
}
})