如何将 Spark 消耗的最新偏移量保存到 ZK 或 Kafka 并在重启后可以读回

How to save latest offset that Spark consumed to ZK or Kafka and can read back after restart

我正在使用 Kafka 0.8.2 从 AdExchange 接收数据,然后我使用 Spark Streaming 1.4.1 将数据存储到 MongoDB

我的问题是当我重新启动我的 Spark Streaming 作业时,例如更新新版本、修复错误、添加新功能。它将继续读取 kafka 的最新 offset 然后我将在重新启动作业期间丢失数据 AdX 推送到 kafka。

我尝试了类似 auto.offset.reset -> smallest 的方法,但它会从 0 接收 -> 最后数据很大并且在 db 中重复。

我也尝试将特定的 group.idconsumer.id 设置为 Spark 但它是一样的。

如何将最近消耗的 offset 火花保存到 zookeeperkafka 然后可以从中读取到最新的 offset?

createDirectStream 函数的构造函数之一可以获得一个映射,它将分区 ID 作为键,并将您开始使用的偏移量作为值。

只看api这里:http://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html 我所说的地图通常称为:fromOffsets

您可以向地图中插入数据:

startOffsetsMap.put(TopicAndPartition(topicName,partitionId), startOffset)

并在创建直接流时使用它:

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
                streamingContext, kafkaParams, startOffsetsMap, messageHandler(_))

每次迭代后,您可以使用以下方法获取处理后的偏移量:

rdd.asInstanceOf[HasOffsetRanges].offsetRanges

您将能够在下一次迭代中使用此数据构建 fromOffsets 映射。

您可以在此处查看完整代码和用法:https://spark.apache.org/docs/latest/streaming-kafka-integration.html 在页面末尾

我还没有 100% 解决这个问题,但最好的办法可能是设置 JavaStreamingContext.checkpoint()。

有关示例,请参阅 https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#checkpointing

根据一些博客条目 https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md 有一些警告,但几乎感觉它涉及某些边缘案例,这些案例只是被提及而没有实际解释。

添加到 Michael Kopaniov 的回答中,如果您真的想使用 ZK 作为存储和加载偏移图的位置,您可以。

但是,由于您的结果没有输出到 ZK,除非您的输出操作是幂等的(听起来好像不是),否则您将无法获得可靠的语义。

如果可以在单个原子操作中将结果与偏移量一起存储在 mongo 中的同一个文档中,那可能对您更好。

有关详细信息,请参阅 https://www.youtube.com/watch?v=fXnNEq1v3VA

这里有一些代码可以用来在 ZK 中存储偏移量 http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/

这里有一些代码,您可以在调用 KafkaUtils.createDirectStream 时使用偏移量: http://geeks.aretotally.in/spark-streaming-direct-api-reusing-offset-from-zookeeper/