Spark Streaming 指定起止偏移量

Spark Streaming specify starting-ending offsets

我有一个场景,我想使用 Spark DStreams 重新处理来自 Kafka 的特定批次数据。

假设我想重新处理以下批次的数据。

主题-Partition1-{1000,2000} 主题-Partition2-{500-600}

下面是我的代码片段,我可以在其中指定起始偏移量。

val inputDStream = KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Assign[String, String](
      topic-partition-list,
      kafkaProps,
      starting-offset-ranges))

但是,我想知道的是,无论如何我也可以指定结束偏移量,就像在结构化流批处理模式的情况下一样。

所以本质上,它应该处理这个小批量并停止工作流。

注意:我不想在这个用例中使用结构化流。只想使用 DStreams。

找到方法了。

val offsetRanges = Array(
  // topic, partition, inclusive starting offset, exclusive ending offset
  OffsetRange("test", 0, 0, 100),
  OffsetRange("test", 1, 0, 100)
)

val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)