来自kafka的火花流如何指定轮询事件的截止时间
spark streaming from kafka how to specify cut off time for events polled
我有 spark streaming 应用程序,它运行 End of day 并消耗上游发送的 kafka 事件 application.Currently 上游应用程序一整天都在推送新数据,我的消费者最终消耗了它。我想根据截止时间限制消耗的事件,比如下午 6 点 daily.Is 有一种方法可以指定截止时间,以限制基于截止时间消耗的事件,比如 kafka 事件时间戳或其他东西。下面是消费者代码
KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topicSet, kafkaParams))
您可以根据时间戳或时间或任何字段过滤掉处理过程中的事件。例如,假设您的事件是 JSON 并且它有一个名为 hour 的字段,它是事件时间的小时值。您可以轻松地只选择在 6 之前创建的事件,如下所示。
directStream.foreachRDD { rdd =>
val eventDfRDD = rdd.filter(record => {
val option = JSON.parseFull(record).get.asInstanceOf[Map[String, String]]
option.get("hour") < 1800
})
}
当您声明流上下文时,我们可以提及创建 dsstream 的截止时间,我们可以将该值传递给 createDirectStream 参数。请找到代码快照。在下面的代码中,5 秒作为截止时间。所以每 5 秒将创建 DStream RDD。
sc = spark.sparkContext
ssc = StreamingContext(sc,5)
kvs = KafkaUtils.createDirectStream(ssc, ['Topic-name'], {"metadata.broker.list": 'Server-name:port-number'},valueDecoder=serializer.decode_message)
这是我实现的解决方案
1: spark streaming 作业启动时将当前时间存储在变量中
val 截止时间 =System.currentTimeMillis()
2:创建DirectStream
val directKafkaStream= KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topicSet, kafkaParams))
3:应用过滤条件
在 foreach 循环中应用如下过滤条件
directKafkaStream.foreachRDD { rdd =>
val filteredRdd = rdd.filter(_.timestamp() < cuttoffTime )
我有 spark streaming 应用程序,它运行 End of day 并消耗上游发送的 kafka 事件 application.Currently 上游应用程序一整天都在推送新数据,我的消费者最终消耗了它。我想根据截止时间限制消耗的事件,比如下午 6 点 daily.Is 有一种方法可以指定截止时间,以限制基于截止时间消耗的事件,比如 kafka 事件时间戳或其他东西。下面是消费者代码
KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topicSet, kafkaParams))
您可以根据时间戳或时间或任何字段过滤掉处理过程中的事件。例如,假设您的事件是 JSON 并且它有一个名为 hour 的字段,它是事件时间的小时值。您可以轻松地只选择在 6 之前创建的事件,如下所示。
directStream.foreachRDD { rdd =>
val eventDfRDD = rdd.filter(record => {
val option = JSON.parseFull(record).get.asInstanceOf[Map[String, String]]
option.get("hour") < 1800
})
}
当您声明流上下文时,我们可以提及创建 dsstream 的截止时间,我们可以将该值传递给 createDirectStream 参数。请找到代码快照。在下面的代码中,5 秒作为截止时间。所以每 5 秒将创建 DStream RDD。
sc = spark.sparkContext
ssc = StreamingContext(sc,5)
kvs = KafkaUtils.createDirectStream(ssc, ['Topic-name'], {"metadata.broker.list": 'Server-name:port-number'},valueDecoder=serializer.decode_message)
这是我实现的解决方案
1: spark streaming 作业启动时将当前时间存储在变量中
val 截止时间 =System.currentTimeMillis()
2:创建DirectStream
val directKafkaStream= KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topicSet, kafkaParams))
3:应用过滤条件 在 foreach 循环中应用如下过滤条件
directKafkaStream.foreachRDD { rdd => val filteredRdd = rdd.filter(_.timestamp() < cuttoffTime )