Spark Structured Streaming水印和dropduplicate?
Spark Structured Streaming watermark and dropduplicate?
我正在尝试删除带有水印的重复项,问题是水印无法清除状态,
我的代码是:
def main(args: Array[String]): Unit = {
@transient lazy val log = LogManager.getRootLogger
val spark = SparkSession
.builder
.master("local[2]")
.appName("RateResource")
.getOrCreate()
import spark.implicits._
val rateData: DataFrame = spark.readStream.format("rate").load()
val transData = rateData
.select($"timestamp" as "wtimestamp",$"value", $"value"%1000%100%10 as "key",$"value"%1000%100/10%2 as "dkey")
.where("key=0")
val selectData =transData
.withWatermark("wtimestamp","20 seconds") //
.dropDuplicates("dkey","wtimestamp")
val query = selectData.writeStream
.outputMode("update")
.format("console")
.option("truncate", "false")
.start()
query.awaitTermination()
}
并输入记录:
2017-08-09 10:00:10,10
2017-08-09 10:00:20,20
2017-08-09 10:00:30,10
2017-08-09 10:00:10,10
2017-08-09 11:00:30,40
2017-08-09 10:00:10,10
那么第一个“2017-08-09 10:00:10,10”可以输出,第二个“2017-08-09 10:00:10,10”
超过10秒后无法输出
-------------------------------------------
Batch: 1
-------------------------------------------
+-------------------+-----+---+----+
|wtimestamp |value|key|dkey|
+-------------------+-----+---+----+
|2017-08-09 10:00:10|10 |0.0|1.0 |
+-------------------+-----+---+----+
-------------------------------------------
Batch: 2
-------------------------------------------
+----------+-----+---+----+
|wtimestamp|value|key|dkey|
+----------+-----+---+----+
+----------+-----+---+----+
-------------------------------------------
Batch: 3
-------------------------------------------
+-------------------+-----+---+----+
|wtimestamp |value|key|dkey|
+-------------------+-----+---+----+
|2017-08-09 10:00:20|20 |0.0|0.0 |
+-------------------+-----+---+----+
-------------------------------------------
Batch: 4
-------------------------------------------
+----------+-----+---+----+
|wtimestamp|value|key|dkey|
+----------+-----+---+----+
+----------+-----+---+----+
-------------------------------------------
Batch: 5
-------------------------------------------
+-------------------+-----+---+----+
|wtimestamp |value|key|dkey|
+-------------------+-----+---+----+
|2017-08-09 10:00:30|10 |0.0|1.0 |
+-------------------+-----+---+----+
-------------------------------------------
Batch: 6
-------------------------------------------
+----------+-----+---+----+
|wtimestamp|value|key|dkey|
+----------+-----+---+----+
+----------+-----+---+----+
-------------------------------------------
Batch: 7
-------------------------------------------
+----------+-----+---+----+
|wtimestamp|value|key|dkey|
+----------+-----+---+----+
+----------+-----+---+----+
-------------------------------------------
Batch: 8
-------------------------------------------
+-------------------+-----+---+----+
|wtimestamp |value|key|dkey|
+-------------------+-----+---+----+
|2017-08-09 11:00:30|40 |0.0|0.0 |
+-------------------+-----+---+----+
我在 window 中通过使用 maxevent-time 知道水印删除状态,但在删除重复时,我不知道它是如何清除状态的?
Operator dropduplicate 通过水印清除状态。作为您的代码,dropduplicate 之前的最新水印是20 秒。所以,spark会将所有数据从当前最大时间往后退一步20秒,也就是将数据与最近20分钟的数据进行比较,更早的数据会被清除。
我正在尝试删除带有水印的重复项,问题是水印无法清除状态, 我的代码是:
def main(args: Array[String]): Unit = {
@transient lazy val log = LogManager.getRootLogger
val spark = SparkSession
.builder
.master("local[2]")
.appName("RateResource")
.getOrCreate()
import spark.implicits._
val rateData: DataFrame = spark.readStream.format("rate").load()
val transData = rateData
.select($"timestamp" as "wtimestamp",$"value", $"value"%1000%100%10 as "key",$"value"%1000%100/10%2 as "dkey")
.where("key=0")
val selectData =transData
.withWatermark("wtimestamp","20 seconds") //
.dropDuplicates("dkey","wtimestamp")
val query = selectData.writeStream
.outputMode("update")
.format("console")
.option("truncate", "false")
.start()
query.awaitTermination()
}
并输入记录:
2017-08-09 10:00:10,10
2017-08-09 10:00:20,20
2017-08-09 10:00:30,10
2017-08-09 10:00:10,10
2017-08-09 11:00:30,40
2017-08-09 10:00:10,10
那么第一个“2017-08-09 10:00:10,10”可以输出,第二个“2017-08-09 10:00:10,10” 超过10秒后无法输出
-------------------------------------------
Batch: 1
-------------------------------------------
+-------------------+-----+---+----+
|wtimestamp |value|key|dkey|
+-------------------+-----+---+----+
|2017-08-09 10:00:10|10 |0.0|1.0 |
+-------------------+-----+---+----+
-------------------------------------------
Batch: 2
-------------------------------------------
+----------+-----+---+----+
|wtimestamp|value|key|dkey|
+----------+-----+---+----+
+----------+-----+---+----+
-------------------------------------------
Batch: 3
-------------------------------------------
+-------------------+-----+---+----+
|wtimestamp |value|key|dkey|
+-------------------+-----+---+----+
|2017-08-09 10:00:20|20 |0.0|0.0 |
+-------------------+-----+---+----+
-------------------------------------------
Batch: 4
-------------------------------------------
+----------+-----+---+----+
|wtimestamp|value|key|dkey|
+----------+-----+---+----+
+----------+-----+---+----+
-------------------------------------------
Batch: 5
-------------------------------------------
+-------------------+-----+---+----+
|wtimestamp |value|key|dkey|
+-------------------+-----+---+----+
|2017-08-09 10:00:30|10 |0.0|1.0 |
+-------------------+-----+---+----+
-------------------------------------------
Batch: 6
-------------------------------------------
+----------+-----+---+----+
|wtimestamp|value|key|dkey|
+----------+-----+---+----+
+----------+-----+---+----+
-------------------------------------------
Batch: 7
-------------------------------------------
+----------+-----+---+----+
|wtimestamp|value|key|dkey|
+----------+-----+---+----+
+----------+-----+---+----+
-------------------------------------------
Batch: 8
-------------------------------------------
+-------------------+-----+---+----+
|wtimestamp |value|key|dkey|
+-------------------+-----+---+----+
|2017-08-09 11:00:30|40 |0.0|0.0 |
+-------------------+-----+---+----+
我在 window 中通过使用 maxevent-time 知道水印删除状态,但在删除重复时,我不知道它是如何清除状态的?
Operator dropduplicate 通过水印清除状态。作为您的代码,dropduplicate 之前的最新水印是20 秒。所以,spark会将所有数据从当前最大时间往后退一步20秒,也就是将数据与最近20分钟的数据进行比较,更早的数据会被清除。