如何处理 kafka 主题中的旧数据?
How can I handle old data in the kafka topic?
我开始使用 spark 结构化流。
我从 kafka 主题获取 readStream(startOffset:最新)
带水印,
按持续时间 window 的事件时间分组,
并写入 kafka 主题。
我的问题是,
spark structured streaming job之前写入kafka topic的数据如何处理?
我一开始尝试 运行 使用 `startOffset: earliest'。但是kafka主题中的数据太大,所以由于yarn超时,spark streaming进程没有启动。 (即使我增加了超时值)
1。
如果我只是创建一个批处理作业并按特定数据范围进行过滤。
结果未反映在火花流的当前状态中,
结果的一致性和准确性似乎有问题
- 我尝试重置检查点目录,但没有成功。
如何处理旧数据和大数据?
帮帮我。
您可以尝试使用参数 maxOffsetsPerTrigger
for Kafka + Structured Streaming 从 Kafka 接收旧数据。将此参数的值设置为您希望一次从 Kafka 接收的记录数。
使用:
sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test-name")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 1)
.option("group.id", "2")
.option("auto.offset.reset", "earliest")
.load()
我开始使用 spark 结构化流。
我从 kafka 主题获取 readStream(startOffset:最新) 带水印, 按持续时间 window 的事件时间分组, 并写入 kafka 主题。
我的问题是, spark structured streaming job之前写入kafka topic的数据如何处理?
我一开始尝试 运行 使用 `startOffset: earliest'。但是kafka主题中的数据太大,所以由于yarn超时,spark streaming进程没有启动。 (即使我增加了超时值)
1。 如果我只是创建一个批处理作业并按特定数据范围进行过滤。 结果未反映在火花流的当前状态中, 结果的一致性和准确性似乎有问题
- 我尝试重置检查点目录,但没有成功。
如何处理旧数据和大数据? 帮帮我。
您可以尝试使用参数 maxOffsetsPerTrigger
for Kafka + Structured Streaming 从 Kafka 接收旧数据。将此参数的值设置为您希望一次从 Kafka 接收的记录数。
使用:
sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test-name")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 1)
.option("group.id", "2")
.option("auto.offset.reset", "earliest")
.load()