如何为火花流中的每批数据添加标签?

How to add tags for every batch data in spark streaming?

如果我设置了5秒的批次间隔(Seconds(5)),每5秒,我为当前批次数据添加一个标签。如果我可以为每批数据添加标签,当我使用 window() 功能时,我可以按标签过滤数据。

第 1 5 秒 输入一些数据:

hello
word
hello

为这样的数据添加标签后:

(1st, hello)     // "1st" is the custom tag that can identify this batch data
(1st, word)
(1st, hello)

第2个5秒输入一些数据:

spark
streaming
interval
time

为数据添加标签后:

(2nd, spark)
(2nd, streaming)
(2nd, interval)
(2nd, time)

有3个选项:-

  1. 最好的方法是在消息本身中添加一些标识,这样当您收到消息时,您已经有了可以识别每条消息的东西。
  2. 第二个选项是创建Custom receiver,它可以识别消息 Batch 并添加一些标签,然后进一步将其发送到 Spark Job。
  3. 最后的选择是利用 Accumulator。像这样:-

    val sc = new SparkContext(conf)
    var accum = sc.accumulator(0, "My Accumulator")
    val recDStream = //Write Code to get the Stream
    recDStream.foreachRDD(x => "Data for Batch-"+(accum+=1)+"-"+x)
    //Or may be you can add Accumulator after the forEach, 
    //so that it becomes for a whole Batch something like accum.add(1)