如何将 Spark 消耗的最新偏移量保存到 ZK 或 Kafka 并在重启后可以读回
How to save latest offset that Spark consumed to ZK or Kafka and can read back after restart
我正在使用 Kafka 0.8.2
从 AdExchange 接收数据,然后我使用 Spark Streaming 1.4.1
将数据存储到 MongoDB
。
我的问题是当我重新启动我的 Spark Streaming
作业时,例如更新新版本、修复错误、添加新功能。它将继续读取 kafka
的最新 offset
然后我将在重新启动作业期间丢失数据 AdX 推送到 kafka。
我尝试了类似 auto.offset.reset -> smallest
的方法,但它会从 0 接收 -> 最后数据很大并且在 db 中重复。
我也尝试将特定的 group.id
和 consumer.id
设置为 Spark
但它是一样的。
如何将最近消耗的 offset
火花保存到 zookeeper
或 kafka
然后可以从中读取到最新的 offset
?
createDirectStream 函数的构造函数之一可以获得一个映射,它将分区 ID 作为键,并将您开始使用的偏移量作为值。
只看api这里:http://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html
我所说的地图通常称为:fromOffsets
您可以向地图中插入数据:
startOffsetsMap.put(TopicAndPartition(topicName,partitionId), startOffset)
并在创建直接流时使用它:
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
streamingContext, kafkaParams, startOffsetsMap, messageHandler(_))
每次迭代后,您可以使用以下方法获取处理后的偏移量:
rdd.asInstanceOf[HasOffsetRanges].offsetRanges
您将能够在下一次迭代中使用此数据构建 fromOffsets 映射。
您可以在此处查看完整代码和用法:https://spark.apache.org/docs/latest/streaming-kafka-integration.html 在页面末尾
我还没有 100% 解决这个问题,但最好的办法可能是设置 JavaStreamingContext.checkpoint()。
有关示例,请参阅 https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#checkpointing。
根据一些博客条目 https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md 有一些警告,但几乎感觉它涉及某些边缘案例,这些案例只是被提及而没有实际解释。
添加到 Michael Kopaniov 的回答中,如果您真的想使用 ZK 作为存储和加载偏移图的位置,您可以。
但是,由于您的结果没有输出到 ZK,除非您的输出操作是幂等的(听起来好像不是),否则您将无法获得可靠的语义。
如果可以在单个原子操作中将结果与偏移量一起存储在 mongo 中的同一个文档中,那可能对您更好。
有关详细信息,请参阅 https://www.youtube.com/watch?v=fXnNEq1v3VA
这里有一些代码可以用来在 ZK 中存储偏移量 http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/
这里有一些代码,您可以在调用 KafkaUtils.createDirectStream 时使用偏移量:
http://geeks.aretotally.in/spark-streaming-direct-api-reusing-offset-from-zookeeper/
我正在使用 Kafka 0.8.2
从 AdExchange 接收数据,然后我使用 Spark Streaming 1.4.1
将数据存储到 MongoDB
。
我的问题是当我重新启动我的 Spark Streaming
作业时,例如更新新版本、修复错误、添加新功能。它将继续读取 kafka
的最新 offset
然后我将在重新启动作业期间丢失数据 AdX 推送到 kafka。
我尝试了类似 auto.offset.reset -> smallest
的方法,但它会从 0 接收 -> 最后数据很大并且在 db 中重复。
我也尝试将特定的 group.id
和 consumer.id
设置为 Spark
但它是一样的。
如何将最近消耗的 offset
火花保存到 zookeeper
或 kafka
然后可以从中读取到最新的 offset
?
createDirectStream 函数的构造函数之一可以获得一个映射,它将分区 ID 作为键,并将您开始使用的偏移量作为值。
只看api这里:http://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html 我所说的地图通常称为:fromOffsets
您可以向地图中插入数据:
startOffsetsMap.put(TopicAndPartition(topicName,partitionId), startOffset)
并在创建直接流时使用它:
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
streamingContext, kafkaParams, startOffsetsMap, messageHandler(_))
每次迭代后,您可以使用以下方法获取处理后的偏移量:
rdd.asInstanceOf[HasOffsetRanges].offsetRanges
您将能够在下一次迭代中使用此数据构建 fromOffsets 映射。
您可以在此处查看完整代码和用法:https://spark.apache.org/docs/latest/streaming-kafka-integration.html 在页面末尾
我还没有 100% 解决这个问题,但最好的办法可能是设置 JavaStreamingContext.checkpoint()。
有关示例,请参阅 https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#checkpointing。
根据一些博客条目 https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md 有一些警告,但几乎感觉它涉及某些边缘案例,这些案例只是被提及而没有实际解释。
添加到 Michael Kopaniov 的回答中,如果您真的想使用 ZK 作为存储和加载偏移图的位置,您可以。
但是,由于您的结果没有输出到 ZK,除非您的输出操作是幂等的(听起来好像不是),否则您将无法获得可靠的语义。
如果可以在单个原子操作中将结果与偏移量一起存储在 mongo 中的同一个文档中,那可能对您更好。
有关详细信息,请参阅 https://www.youtube.com/watch?v=fXnNEq1v3VA
这里有一些代码可以用来在 ZK 中存储偏移量 http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/
这里有一些代码,您可以在调用 KafkaUtils.createDirectStream 时使用偏移量: http://geeks.aretotally.in/spark-streaming-direct-api-reusing-offset-from-zookeeper/