使用 Kafka 的 Spark 结构化批处理作业管理偏移量
Managing Offsets with Spark Structured Batch Job with Kafka
我有一个写批处理作业的用例
我需要读取 Kafka 主题并将数据记录到 HDFS。我的代码如下所示
val df: DataFrame = spark.read
.format("kafka")
.option("subscribe", "test-topic")
.option("includeTimestamp", true)
.option("kafka.bootstrap.servers", "localhost:9092")
.option("group.id", "test-cg")
.option("checkpointLocation", "/group/test/checkpointsDir")
.load
df.write.
parquet(buildPathWithCurrentBatchTime())
每次作业读取 Kafka 主题时,它都会从最早的偏移量开始,因此同一条消息会被记录在多个批次中。
如何让我的作业读取从前一个作业实例读取的偏移量之后的偏移量开始的消息。
我尝试设置检查点位置、组 ID 但没有帮助。
我不想使用流式查询。我有一个简单的用例,记录来自 Kafka Topic 的数据。我没有任何延迟要求。唯一的要求是期刊中不能有任何重复。这是一个低优先级。如果我使用流式查询,它将一直使用执行程序,这是一种资源浪费。因此我想分批进行
您使用的是批量查询,而不是流式查询。 (也许缺少点?)只需将 read
替换为 readStream
并将 write
替换为 writeStream
即可。
编辑:正如 OP 阐明的那样,可以使用一次性触发器,我刚刚更新了代码以使用带有一次性触发器的结构化流。
(免责声明:我没有 compile/run 代码,但更改适合结构化流媒体指南文档。)
val df: DataFrame = spark.readStream
.format("kafka")
.option("subscribe", "test-topic")
.option("includeTimestamp", true)
.option("kafka.bootstrap.servers", "localhost:9092")
.option("group.id", "test-cg")
.option("checkpointLocation", "/group/test/checkpointsDir")
.load
val query = df.writeStream
.format("parquet")
.option("path", buildPathWithCurrentBatchTime())
.trigger(Trigger.Once())
.start()
query.awaitTermination()
我有一个写批处理作业的用例
我需要读取 Kafka 主题并将数据记录到 HDFS。我的代码如下所示
val df: DataFrame = spark.read
.format("kafka")
.option("subscribe", "test-topic")
.option("includeTimestamp", true)
.option("kafka.bootstrap.servers", "localhost:9092")
.option("group.id", "test-cg")
.option("checkpointLocation", "/group/test/checkpointsDir")
.load
df.write.
parquet(buildPathWithCurrentBatchTime())
每次作业读取 Kafka 主题时,它都会从最早的偏移量开始,因此同一条消息会被记录在多个批次中。 如何让我的作业读取从前一个作业实例读取的偏移量之后的偏移量开始的消息。
我尝试设置检查点位置、组 ID 但没有帮助。
我不想使用流式查询。我有一个简单的用例,记录来自 Kafka Topic 的数据。我没有任何延迟要求。唯一的要求是期刊中不能有任何重复。这是一个低优先级。如果我使用流式查询,它将一直使用执行程序,这是一种资源浪费。因此我想分批进行
您使用的是批量查询,而不是流式查询。 (也许缺少点?)只需将 read
替换为 readStream
并将 write
替换为 writeStream
即可。
编辑:正如 OP 阐明的那样,可以使用一次性触发器,我刚刚更新了代码以使用带有一次性触发器的结构化流。 (免责声明:我没有 compile/run 代码,但更改适合结构化流媒体指南文档。)
val df: DataFrame = spark.readStream
.format("kafka")
.option("subscribe", "test-topic")
.option("includeTimestamp", true)
.option("kafka.bootstrap.servers", "localhost:9092")
.option("group.id", "test-cg")
.option("checkpointLocation", "/group/test/checkpointsDir")
.load
val query = df.writeStream
.format("parquet")
.option("path", buildPathWithCurrentBatchTime())
.trigger(Trigger.Once())
.start()
query.awaitTermination()