Spark 结构化流确认消息
Spark structured streaming acknowledge messages
我正在使用 Spark Structured Streaming 读取 Kafka 主题(比如 topic1)并使用 SINK 写入另一个主题(topic1-result)。在使用 Sink 写入另一个主题后,我可以看到消息没有从 Topic1 中删除。
// Subscribe to 1 topic
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1")
.option("subscribe", "topic1")
.load()
//SINK to another topic
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1")
.option("checkpointLocation", "/tmp/checkpoint1")
.option("topic", "topic1-result")
.start()
文档说我们不能对结构化流使用自动提交
enable.auto.commit: Kafka source doesn’t commit any offset.
但是如何确认消息并从主题 (topic1) 中删除已处理的消息
两个注意事项:
提交后,消息不会从 Kafka 中删除。当您的消费者执行提交时,Kafka 会增加此主题相对于已创建的消费者组的偏移量。但消息会保留在主题中,具体取决于您为主题配置的保留时间。
事实上,Kafka 源不进行提交,流存储指向流检查点目录中下一条消息的偏移量。因此,当您重新启动流式传输时,它会使用最后一个偏移量来消耗它。
我正在使用 Spark Structured Streaming 读取 Kafka 主题(比如 topic1)并使用 SINK 写入另一个主题(topic1-result)。在使用 Sink 写入另一个主题后,我可以看到消息没有从 Topic1 中删除。
// Subscribe to 1 topic
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1")
.option("subscribe", "topic1")
.load()
//SINK to another topic
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1")
.option("checkpointLocation", "/tmp/checkpoint1")
.option("topic", "topic1-result")
.start()
文档说我们不能对结构化流使用自动提交
enable.auto.commit: Kafka source doesn’t commit any offset.
但是如何确认消息并从主题 (topic1) 中删除已处理的消息
两个注意事项:
提交后,消息不会从 Kafka 中删除。当您的消费者执行提交时,Kafka 会增加此主题相对于已创建的消费者组的偏移量。但消息会保留在主题中,具体取决于您为主题配置的保留时间。
事实上,Kafka 源不进行提交,流存储指向流检查点目录中下一条消息的偏移量。因此,当您重新启动流式传输时,它会使用最后一个偏移量来消耗它。