Kafka 消费者在从和直到偏移量已知时从主题读取数据
Kafka consumer to read data from topic when from and until offset is known
我能知道 kafka 消费者是否可以读取特定记录时从和直到一个主题的分区的偏移量是已知的。
用例在我的 spark 流应用程序中,有几批未处理(插入到 table),在这种情况下,我只想读取丢失的数据。我正在存储主题详细信息,即分区和偏移量。
有人可以让我知道在已知偏移量的情况下是否可以从主题中读取这是否可以实现。
如果你想处理一组消息,即由 spark streaming 中的开始和结束偏移量定义,你可以使用以下代码:
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "groupId"
)
val offsetRanges = Array(
OffsetRange("input", 0, 2, 4) // <-- topic name, partition number, fromOffset, untilOffset
)
val sparkContext: SparkContext = ???
val rdd = KafkaUtils.createRDD(sparkContext, kafkaParams.asJava, offsetRanges, PreferConsistent)
// other proccessing and saving
可以找到有关集成 spark streaming 和 Kafka 的更多详细信息:https://spark.apache.org/docs/2.4.0/streaming-kafka-0-10-integration.html
我能知道 kafka 消费者是否可以读取特定记录时从和直到一个主题的分区的偏移量是已知的。
用例在我的 spark 流应用程序中,有几批未处理(插入到 table),在这种情况下,我只想读取丢失的数据。我正在存储主题详细信息,即分区和偏移量。
有人可以让我知道在已知偏移量的情况下是否可以从主题中读取这是否可以实现。
如果你想处理一组消息,即由 spark streaming 中的开始和结束偏移量定义,你可以使用以下代码:
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "groupId"
)
val offsetRanges = Array(
OffsetRange("input", 0, 2, 4) // <-- topic name, partition number, fromOffset, untilOffset
)
val sparkContext: SparkContext = ???
val rdd = KafkaUtils.createRDD(sparkContext, kafkaParams.asJava, offsetRanges, PreferConsistent)
// other proccessing and saving
可以找到有关集成 spark streaming 和 Kafka 的更多详细信息:https://spark.apache.org/docs/2.4.0/streaming-kafka-0-10-integration.html