Spark Streaming 指定起止偏移量
Spark Streaming specify starting-ending offsets
我有一个场景,我想使用 Spark DStreams 重新处理来自 Kafka 的特定批次数据。
假设我想重新处理以下批次的数据。
主题-Partition1-{1000,2000}
主题-Partition2-{500-600}
下面是我的代码片段,我可以在其中指定起始偏移量。
val inputDStream = KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String, String](
topic-partition-list,
kafkaProps,
starting-offset-ranges))
但是,我想知道的是,无论如何我也可以指定结束偏移量,就像在结构化流批处理模式的情况下一样。
所以本质上,它应该处理这个小批量并停止工作流。
注意:我不想在这个用例中使用结构化流。只想使用 DStreams。
找到方法了。
val offsetRanges = Array(
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange("test", 0, 0, 100),
OffsetRange("test", 1, 0, 100)
)
val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
我有一个场景,我想使用 Spark DStreams 重新处理来自 Kafka 的特定批次数据。
假设我想重新处理以下批次的数据。
主题-Partition1-{1000,2000} 主题-Partition2-{500-600}
下面是我的代码片段,我可以在其中指定起始偏移量。
val inputDStream = KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String, String](
topic-partition-list,
kafkaProps,
starting-offset-ranges))
但是,我想知道的是,无论如何我也可以指定结束偏移量,就像在结构化流批处理模式的情况下一样。
所以本质上,它应该处理这个小批量并停止工作流。
注意:我不想在这个用例中使用结构化流。只想使用 DStreams。
找到方法了。
val offsetRanges = Array(
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange("test", 0, 0, 100),
OffsetRange("test", 1, 0, 100)
)
val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)