来自kafka的火花流如何指定轮询事件的截止时间

spark streaming from kafka how to specify cut off time for events polled

我有 spark streaming 应用程序,它运行 End of day 并消耗上游发送的 kafka 事件 application.Currently 上游应用程序一整天都在推送新数据,我的消费者最终消耗了它。我想根据截止时间限制消耗的事件,比如下午 6 点 daily.Is 有一种方法可以指定截止时间,以限制基于截止时间消耗的事件,比如 kafka 事件时间戳或其他东西。下面是消费者代码

  KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topicSet, kafkaParams))

您可以根据时间戳或时间或任何字段过滤掉处理过程中的事件。例如,假设您的事件是 JSON 并且它有一个名为 hour 的字段,它是事件时间的小时值。您可以轻松地只选择在 6 之前创建的事件,如下所示。

directStream.foreachRDD { rdd =>
        val eventDfRDD = rdd.filter(record => {
          val option = JSON.parseFull(record).get.asInstanceOf[Map[String, String]]
          option.get("hour") < 1800
        })
      }

当您声明流上下文时,我们可以提及创建 dsstream 的截止时间,我们可以将该值传递给 createDirectStream 参数。请找到代码快照。在下面的代码中,5 秒作为截止时间。所以每 5 秒将创建 DStream RDD。

sc = spark.sparkContext
ssc = StreamingContext(sc,5)
kvs = KafkaUtils.createDirectStream(ssc, ['Topic-name'], {"metadata.broker.list": 'Server-name:port-number'},valueDecoder=serializer.decode_message)

这是我实现的解决方案

1: spark streaming 作业启动时将当前时间存储在变量中

val 截止时间 =System.currentTimeMillis()

2:创建DirectStream

val directKafkaStream=   KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topicSet, kafkaParams))

3:应用过滤条件 在 foreach 循环中应用如下过滤条件

directKafkaStream.foreachRDD { rdd => val filteredRdd = rdd.filter(_.timestamp() < cuttoffTime )