为什么使用更新输出模式的流式查询会打印出所有行?
Why does streaming query with update output mode print out all rows?
我的目录中有三个文本文件:
a.txt
A B
C D
A E
F
b.txt
A B
C D
A E
c.txt
A B
C D
A E
G
我使用以下流式查询:
val schema = new StructType().add("value", "string")
val lines = spark
.readStream
.schema(schema)
.option("maxFilesPerTrigger", 1)
.text(...)
.as[String]
val wordCounts = lines.flatMap(_.split("\s+")).groupBy("value").count()
val query = wordCounts.writeStream
.queryName("t")
.outputMode("update") // <-- output mode: update
.format("memory")
.start()
while (true) {
spark.sql("select * from t").show(truncate = false)
println(new Date())
Thread.sleep(1000)
}
查询总是输出如下结果:
+-----+-----+
|value|count|
+-----+-----+
|A |2 |
|B |1 |
|C |1 |
|D |1 |
|E |1 |
|A |4 |
|B |2 |
|C |2 |
|D |2 |
|E |2 |
|G |1 |
|A |6 |
|B |3 |
|C |3 |
|D |3 |
|E |3 |
|F |1 |
+-----+-----+
看起来每个文件的结果都附加到输出结果(如 Append
输出模式),我不确定我是否理解 update
模式的含义。 update
输出模式如何工作?
在Append模式下,只有自上次触发后添加到ResultTable的新行才会输出到sink。 只有那些添加到结果 Table 的行永远不会改变的查询才支持此功能。因此,这种模式保证每一行只输出一次。
在更新模式下,只有结果Table中自上次触发后更新的行才会输出到接收器。
为了更好地理解这些模式,我将输出格式更改为 console 并修改了数据,以更新模式执行,结果如下:
a.txt
A B
C D
A E
F X
Y Z
b.txt
A B
C D
A E
c.txt
A B
C D
A E
G
scala> val query = wordCounts.writeStream.queryName("t").outputMode("update").format("console").start()
query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1985f8e3
scala> -------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
| F| 1|
| E| 1|
| B| 1|
| Y| 1|
| D| 1|
| C| 1|
| Z| 1|
| A| 2|
| X| 1|
+-----+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
| E| 2|
| B| 2|
| D| 2|
| C| 2|
| A| 4|
+-----+-----+
-------------------------------------------
Batch: 2
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
| E| 3|
| B| 3|
| D| 3|
| C| 3|
| A| 6|
| G| 1|
+-----+-----+
您可以看到,对于每个批次,只有自上次触发后更新的行才会显示在控制台中。 (例如:X、Y、Z 计数未显示在第 1 批和第 2 批中,因为它们未更新)。
在你的例子中,当你将数据写入内存时。由于您不会为每批次逐出内存,因此在查询时也会检索以前的批次数据。希望模式现在清楚了。
我的目录中有三个文本文件:
a.txt
A B
C D
A E
F
b.txt
A B
C D
A E
c.txt
A B
C D
A E
G
我使用以下流式查询:
val schema = new StructType().add("value", "string")
val lines = spark
.readStream
.schema(schema)
.option("maxFilesPerTrigger", 1)
.text(...)
.as[String]
val wordCounts = lines.flatMap(_.split("\s+")).groupBy("value").count()
val query = wordCounts.writeStream
.queryName("t")
.outputMode("update") // <-- output mode: update
.format("memory")
.start()
while (true) {
spark.sql("select * from t").show(truncate = false)
println(new Date())
Thread.sleep(1000)
}
查询总是输出如下结果:
+-----+-----+
|value|count|
+-----+-----+
|A |2 |
|B |1 |
|C |1 |
|D |1 |
|E |1 |
|A |4 |
|B |2 |
|C |2 |
|D |2 |
|E |2 |
|G |1 |
|A |6 |
|B |3 |
|C |3 |
|D |3 |
|E |3 |
|F |1 |
+-----+-----+
看起来每个文件的结果都附加到输出结果(如 Append
输出模式),我不确定我是否理解 update
模式的含义。 update
输出模式如何工作?
在Append模式下,只有自上次触发后添加到ResultTable的新行才会输出到sink。 只有那些添加到结果 Table 的行永远不会改变的查询才支持此功能。因此,这种模式保证每一行只输出一次。
在更新模式下,只有结果Table中自上次触发后更新的行才会输出到接收器。
为了更好地理解这些模式,我将输出格式更改为 console 并修改了数据,以更新模式执行,结果如下:
a.txt
A B
C D
A E
F X
Y Z
b.txt
A B
C D
A E
c.txt
A B
C D
A E
G
scala> val query = wordCounts.writeStream.queryName("t").outputMode("update").format("console").start()
query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1985f8e3
scala> -------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
| F| 1|
| E| 1|
| B| 1|
| Y| 1|
| D| 1|
| C| 1|
| Z| 1|
| A| 2|
| X| 1|
+-----+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
| E| 2|
| B| 2|
| D| 2|
| C| 2|
| A| 4|
+-----+-----+
-------------------------------------------
Batch: 2
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
| E| 3|
| B| 3|
| D| 3|
| C| 3|
| A| 6|
| G| 1|
+-----+-----+
您可以看到,对于每个批次,只有自上次触发后更新的行才会显示在控制台中。 (例如:X、Y、Z 计数未显示在第 1 批和第 2 批中,因为它们未更新)。
在你的例子中,当你将数据写入内存时。由于您不会为每批次逐出内存,因此在查询时也会检索以前的批次数据。希望模式现在清楚了。