scala Spark结构化流接收重复消息
scala Spark structured streaming Receiving duplicate message
我正在使用 Kafka 和 Spark 2.4.5 Structured Streaming.I 正在做平均 operation.but 由于从当前批次的 Kafka 主题中获取重复记录,我遇到了问题。
例如,Kafka 主题消息在更新模式下第 1 批收到
car,Brand=Honda,speed=110,1588569015000000000
car,Brand=ford,speed=90,1588569015000000000
car,Brand=Honda,speed=80,15885690150000000000
here the result is average on car brand per timestamp
i.e groupby on 1588569015000000000 and Brand=Honda , the result we got
110+90/2 = 100
now second message received late data with the duplicate message with same timestamp
car,Brand=Honda,speed=50,1588569015000000000
car,Brand=Honda,speed=50,1588569015000000000
i am expecting average should update to 110+90+50/3 = 83.33
but result update to 110+90+50+50/4=75,which is wrong
val rawDataStream: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", "topic1") // Both topics on same stream!
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING) as data")
按时间戳和品牌分组
用检查点写入kafka
如何使用 Spark Structured Streaming 执行此操作或代码有任何错误?
Spark Structured Streaming 允许使用 dropDuplicates
在流式数据帧上进行重复数据删除。您需要指定字段来识别重复记录和跨批次,spark 将只考虑每个组合的第一条记录,具有重复值的记录将被丢弃。
下面的代码片段将在品牌、速度和时间戳组合上删除您的流数据帧的重复数据。
rawDataStream.dropDuplicates("Brand", "speed", "timestamp")
参考 spark 文档here
我正在使用 Kafka 和 Spark 2.4.5 Structured Streaming.I 正在做平均 operation.but 由于从当前批次的 Kafka 主题中获取重复记录,我遇到了问题。
例如,Kafka 主题消息在更新模式下第 1 批收到
car,Brand=Honda,speed=110,1588569015000000000
car,Brand=ford,speed=90,1588569015000000000
car,Brand=Honda,speed=80,15885690150000000000
here the result is average on car brand per timestamp
i.e groupby on 1588569015000000000 and Brand=Honda , the result we got
110+90/2 = 100
now second message received late data with the duplicate message with same timestamp
car,Brand=Honda,speed=50,1588569015000000000
car,Brand=Honda,speed=50,1588569015000000000
i am expecting average should update to 110+90+50/3 = 83.33
but result update to 110+90+50+50/4=75,which is wrong
val rawDataStream: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", "topic1") // Both topics on same stream!
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING) as data")
按时间戳和品牌分组
用检查点写入kafka
如何使用 Spark Structured Streaming 执行此操作或代码有任何错误?
Spark Structured Streaming 允许使用 dropDuplicates
在流式数据帧上进行重复数据删除。您需要指定字段来识别重复记录和跨批次,spark 将只考虑每个组合的第一条记录,具有重复值的记录将被丢弃。
下面的代码片段将在品牌、速度和时间戳组合上删除您的流数据帧的重复数据。
rawDataStream.dropDuplicates("Brand", "speed", "timestamp")
参考 spark 文档here