有没有办法从 Spark 流作业中读取 Kafka 流中的特定偏移量?

Is there a way to read from specific offset in a Kafka stream from a Spark streaming job?

我正在尝试使用以下方法将我的 Spark 流作业的偏移量提交给 Kafka:

OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

            // some time later, after outputs have completed
              ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);

正如我从这个问题中得到的:

这工作正常,正在提交偏移量。然而,问题在于这是异步的,这意味着即使在线下发送了两个偏移量提交之后,Kafka 仍可能保留之前两个提交的偏移量。如果消费者此时崩溃,而我将其恢复,它会开始读取已经处理过的消息。

现在,来自其他来源,喜欢这里的评论部分:

https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o

我知道无法从 Spark 流作业同步提交偏移量(尽管如果我使用 Kafka 流,则可以)。人们宁愿建议将偏移量保留在数据库中,您将计算的最终结果保存在流中。

现在,我的问题是: 如果我确实将当前读取的偏移量存储在我的数据库中,那么下次如何从准确的偏移量开始读取流?

我研究并找到了我的问题的答案,所以我将它张贴在这里以供可能遇到相同问题的其他人使用:

  • 创建一个 Map 对象,以 org.apache.kafka.common.TopicPartition 为键,一个 Long 为值。 TopicPartition 构造函数有两个参数,主题名称和您将从中读取的分区。 Map 对象的值是您要从中读取流的偏移量的长表示形式。

    Map startingOffset = new HashMap<>(); startingOffset.put(新主题分区("topic_name", 0), 3332980L);

  • 将流内容读入适当的 JavaInputStream,并将先前创建的 Map 对象作为参数提供给 ConsumerStrategies.Subscribe() 方法。

    最终 JavaInputDStream> stream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(主题,​​ kafkaParams, startingOffset));