加水印到底什么时候会丢弃迟到的数据?
When exactly does watermarking drop late data?
这是我用来测试水印的简单代码:
spark.readStream
.textFile("C:\Users\Pavel_Orekhov\Desktop\stream")
.map(_.split(","))
.map(a => (a(0), a(1), a(2)))
.toDF("hour", "hashTag", "userId")
.selectExpr("CAST(hour as TimeStamp)","hashTag", "userId")
.withWatermark("hour", "1 hour")
.groupBy(
window($"hour", "1 hour", "1 hour"),
$"hashTag",
$"userId"
).count()
.writeStream
.outputMode(OutputMode.Update())
.format("console")
.start().processAllAvailable()
文件夹 stream
包含一个文件,内容如下:
1994-12-28T09:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T10:03,pavel,123
1994-12-28T10:03,pavel,123
1994-12-28T11:03,pavel,123
1994-12-28T11:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T06:03,pavel,123
我得到的输出是这样的:
+--------------------+-------+------+-----+
| window|hashTag|userId|count|
+--------------------+-------+------+-----+
|[1994-12-28 09:00...| pavel| 123| 7|
|[1994-12-28 06:00...| pavel| 123| 1|
|[1994-12-28 11:00...| pavel| 123| 2|
|[1994-12-28 10:00...| pavel| 123| 2|
+--------------------+-------+------+-----+
在我读到的文本文件中,您可以看到上午 9 点条目和上午 11 点条目之后的上午 6 点条目。我以为这些会被删除,因为水印应该只更新我们在过去一小时内收到的数据。
那么,为什么它没有被丢弃?
原来这是因为只是一批,里面的数据是无序的。当我创建一个值为 1994-12-28T06:03,pavel,123
的新文件时,它确实被删除了,因为它是新批次的一部分。
这是我用来测试水印的简单代码:
spark.readStream
.textFile("C:\Users\Pavel_Orekhov\Desktop\stream")
.map(_.split(","))
.map(a => (a(0), a(1), a(2)))
.toDF("hour", "hashTag", "userId")
.selectExpr("CAST(hour as TimeStamp)","hashTag", "userId")
.withWatermark("hour", "1 hour")
.groupBy(
window($"hour", "1 hour", "1 hour"),
$"hashTag",
$"userId"
).count()
.writeStream
.outputMode(OutputMode.Update())
.format("console")
.start().processAllAvailable()
文件夹 stream
包含一个文件,内容如下:
1994-12-28T09:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T10:03,pavel,123
1994-12-28T10:03,pavel,123
1994-12-28T11:03,pavel,123
1994-12-28T11:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T06:03,pavel,123
我得到的输出是这样的:
+--------------------+-------+------+-----+
| window|hashTag|userId|count|
+--------------------+-------+------+-----+
|[1994-12-28 09:00...| pavel| 123| 7|
|[1994-12-28 06:00...| pavel| 123| 1|
|[1994-12-28 11:00...| pavel| 123| 2|
|[1994-12-28 10:00...| pavel| 123| 2|
+--------------------+-------+------+-----+
在我读到的文本文件中,您可以看到上午 9 点条目和上午 11 点条目之后的上午 6 点条目。我以为这些会被删除,因为水印应该只更新我们在过去一小时内收到的数据。
那么,为什么它没有被丢弃?
原来这是因为只是一批,里面的数据是无序的。当我创建一个值为 1994-12-28T06:03,pavel,123
的新文件时,它确实被删除了,因为它是新批次的一部分。