如何为火花流中的每批数据添加标签?
How to add tags for every batch data in spark streaming?
如果我设置了5秒的批次间隔(Seconds(5)
),每5秒,我为当前批次数据添加一个标签。如果我可以为每批数据添加标签,当我使用 window()
功能时,我可以按标签过滤数据。
第 1 5 秒 输入一些数据:
hello
word
hello
为这样的数据添加标签后:
(1st, hello) // "1st" is the custom tag that can identify this batch data
(1st, word)
(1st, hello)
第2个5秒输入一些数据:
spark
streaming
interval
time
为数据添加标签后:
(2nd, spark)
(2nd, streaming)
(2nd, interval)
(2nd, time)
有3个选项:-
- 最好的方法是在消息本身中添加一些标识,这样当您收到消息时,您已经有了可以识别每条消息的东西。
- 第二个选项是创建Custom receiver,它可以识别消息 Batch 并添加一些标签,然后进一步将其发送到 Spark Job。
最后的选择是利用 Accumulator。像这样:-
val sc = new SparkContext(conf)
var accum = sc.accumulator(0, "My Accumulator")
val recDStream = //Write Code to get the Stream
recDStream.foreachRDD(x => "Data for Batch-"+(accum+=1)+"-"+x)
//Or may be you can add Accumulator after the forEach,
//so that it becomes for a whole Batch something like accum.add(1)
如果我设置了5秒的批次间隔(Seconds(5)
),每5秒,我为当前批次数据添加一个标签。如果我可以为每批数据添加标签,当我使用 window()
功能时,我可以按标签过滤数据。
第 1 5 秒 输入一些数据:
hello
word
hello
为这样的数据添加标签后:
(1st, hello) // "1st" is the custom tag that can identify this batch data
(1st, word)
(1st, hello)
第2个5秒输入一些数据:
spark
streaming
interval
time
为数据添加标签后:
(2nd, spark)
(2nd, streaming)
(2nd, interval)
(2nd, time)
有3个选项:-
- 最好的方法是在消息本身中添加一些标识,这样当您收到消息时,您已经有了可以识别每条消息的东西。
- 第二个选项是创建Custom receiver,它可以识别消息 Batch 并添加一些标签,然后进一步将其发送到 Spark Job。
最后的选择是利用 Accumulator。像这样:-
val sc = new SparkContext(conf) var accum = sc.accumulator(0, "My Accumulator") val recDStream = //Write Code to get the Stream recDStream.foreachRDD(x => "Data for Batch-"+(accum+=1)+"-"+x) //Or may be you can add Accumulator after the forEach, //so that it becomes for a whole Batch something like accum.add(1)