有没有办法从 Spark 流作业中读取 Kafka 流中的特定偏移量?
Is there a way to read from specific offset in a Kafka stream from a Spark streaming job?
我正在尝试使用以下方法将我的 Spark 流作业的偏移量提交给 Kafka:
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// some time later, after outputs have completed
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
正如我从这个问题中得到的:
这工作正常,正在提交偏移量。然而,问题在于这是异步的,这意味着即使在线下发送了两个偏移量提交之后,Kafka 仍可能保留之前两个提交的偏移量。如果消费者此时崩溃,而我将其恢复,它会开始读取已经处理过的消息。
现在,来自其他来源,喜欢这里的评论部分:
https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o
我知道无法从 Spark 流作业同步提交偏移量(尽管如果我使用 Kafka 流,则可以)。人们宁愿建议将偏移量保留在数据库中,您将计算的最终结果保存在流中。
现在,我的问题是:
如果我确实将当前读取的偏移量存储在我的数据库中,那么下次如何从准确的偏移量开始读取流?
我研究并找到了我的问题的答案,所以我将它张贴在这里以供可能遇到相同问题的其他人使用:
创建一个 Map 对象,以 org.apache.kafka.common.TopicPartition 为键,一个 Long 为值。 TopicPartition 构造函数有两个参数,主题名称和您将从中读取的分区。 Map 对象的值是您要从中读取流的偏移量的长表示形式。
Map startingOffset = new HashMap<>();
startingOffset.put(新主题分区("topic_name", 0), 3332980L);
将流内容读入适当的 JavaInputStream,并将先前创建的 Map 对象作为参数提供给 ConsumerStrategies.Subscribe() 方法。
最终 JavaInputDStream> stream = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(主题, kafkaParams, startingOffset));
我正在尝试使用以下方法将我的 Spark 流作业的偏移量提交给 Kafka:
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// some time later, after outputs have completed
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
正如我从这个问题中得到的:
这工作正常,正在提交偏移量。然而,问题在于这是异步的,这意味着即使在线下发送了两个偏移量提交之后,Kafka 仍可能保留之前两个提交的偏移量。如果消费者此时崩溃,而我将其恢复,它会开始读取已经处理过的消息。
现在,来自其他来源,喜欢这里的评论部分:
https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o
我知道无法从 Spark 流作业同步提交偏移量(尽管如果我使用 Kafka 流,则可以)。人们宁愿建议将偏移量保留在数据库中,您将计算的最终结果保存在流中。
现在,我的问题是: 如果我确实将当前读取的偏移量存储在我的数据库中,那么下次如何从准确的偏移量开始读取流?
我研究并找到了我的问题的答案,所以我将它张贴在这里以供可能遇到相同问题的其他人使用:
创建一个 Map 对象,以 org.apache.kafka.common.TopicPartition 为键,一个 Long 为值。 TopicPartition 构造函数有两个参数,主题名称和您将从中读取的分区。 Map 对象的值是您要从中读取流的偏移量的长表示形式。
Map startingOffset = new HashMap<>(); startingOffset.put(新主题分区("topic_name", 0), 3332980L);
将流内容读入适当的 JavaInputStream,并将先前创建的 Map 对象作为参数提供给 ConsumerStrategies.Subscribe() 方法。
最终 JavaInputDStream> stream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(主题, kafkaParams, startingOffset));