Kafka Stream:应用程序重启时的 Kafka Windowed Stream 行为
Kafka Stream: Kafka Windowed Stream behavior on application restart
处理简单的数据流(比如 inputStream),例如:
请注意,针对相同的键和时间戳更新了值。
|-----|--------|-------|
| Key | TS(ms) | Value |
|-----|--------|-------|
| A | 1000 | 0 |
| B | 1000 | 0 |
| A | 61000 | 0 |
| B | 61000 | 0 |
| A | 121000 | 0 |
| B | 121000 | 0 |
| A | 1000 | 1 |
| B | 1000 | 1 |
| A | 61000 | 1 |
| B | 61000 | 1 |
| A | 121000 | 1 |
| B | 121000 | 1 |
代码如下:
KStream<Windowed<String>, Long> aggregatedStream = inputStream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(1)))
.count(Materialized.as("count-metric"))
.toStream();
aggregatedStream.print(Printed.toSysOut());
打印的输出是
[KTABLE-TOSTREAM-0000000014]: [A@0/60000], 1
[KTABLE-TOSTREAM-0000000014]: [B@0/60000], 1
[KTABLE-TOSTREAM-0000000014]: [A@60000/120000], 1
[KTABLE-TOSTREAM-0000000014]: [B@60000/120000], 1
[KTABLE-TOSTREAM-0000000014]: [A@120000/180000], 1
[KTABLE-TOSTREAM-0000000014]: [B@120000/180000], 1
[KTABLE-TOSTREAM-0000000014]: [A@60000/120000], 2
[KTABLE-TOSTREAM-0000000014]: [B@60000/120000], 2
[KTABLE-TOSTREAM-0000000014]: [A@120000/180000], 2
[KTABLE-TOSTREAM-0000000014]: [B@120000/180000], 2
由于 GracePeriod 设置为 1 分钟,因此当输入流中的值更新为 1 时,windows [A@0/60000] 和 [B@0/60000] 的计数不会递增对于相同的密钥和时间戳。输出如预期所示。
但是当我重新启动我的流应用程序并再次摄取相同的输入流时,我看到了以下输出:
[KTABLE-TOSTREAM-0000000014]: [A@0/60000], 2
[KTABLE-TOSTREAM-0000000014]: [B@0/60000], 2
[KTABLE-TOSTREAM-0000000014]: [A@60000/120000], 2
[KTABLE-TOSTREAM-0000000014]: [B@60000/120000], 2
[KTABLE-TOSTREAM-0000000014]: [A@120000/180000], 2
[KTABLE-TOSTREAM-0000000014]: [B@120000/180000], 2
[KTABLE-TOSTREAM-0000000014]: [A@60000/120000], 3
[KTABLE-TOSTREAM-0000000014]: [B@60000/120000], 3
[KTABLE-TOSTREAM-0000000014]: [A@120000/180000], 3
[KTABLE-TOSTREAM-0000000014]: [B@120000/180000], 3
为什么window[A@0/60000]和[B@0/60000]在重启后更新为2?
在重新启动应用程序之前,streamTime 为 121000,并且 window [A@0/60000] 和 [B@0/60000] 已经超过宽限期并关闭。
为什么这个window重启后才考虑?
这是一个已知问题:https://issues.apache.org/jira/browse/KAFKA-9368
在重新平衡期间,aggregation()
运算符 "forgets" 当前时间,并且在重新启动时仅 "re-learns" 它看到的新记录的时基。这会影响宽限期的应用方式。
处理简单的数据流(比如 inputStream),例如: 请注意,针对相同的键和时间戳更新了值。
|-----|--------|-------|
| Key | TS(ms) | Value |
|-----|--------|-------|
| A | 1000 | 0 |
| B | 1000 | 0 |
| A | 61000 | 0 |
| B | 61000 | 0 |
| A | 121000 | 0 |
| B | 121000 | 0 |
| A | 1000 | 1 |
| B | 1000 | 1 |
| A | 61000 | 1 |
| B | 61000 | 1 |
| A | 121000 | 1 |
| B | 121000 | 1 |
代码如下:
KStream<Windowed<String>, Long> aggregatedStream = inputStream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(1)))
.count(Materialized.as("count-metric"))
.toStream();
aggregatedStream.print(Printed.toSysOut());
打印的输出是
[KTABLE-TOSTREAM-0000000014]: [A@0/60000], 1
[KTABLE-TOSTREAM-0000000014]: [B@0/60000], 1
[KTABLE-TOSTREAM-0000000014]: [A@60000/120000], 1
[KTABLE-TOSTREAM-0000000014]: [B@60000/120000], 1
[KTABLE-TOSTREAM-0000000014]: [A@120000/180000], 1
[KTABLE-TOSTREAM-0000000014]: [B@120000/180000], 1
[KTABLE-TOSTREAM-0000000014]: [A@60000/120000], 2
[KTABLE-TOSTREAM-0000000014]: [B@60000/120000], 2
[KTABLE-TOSTREAM-0000000014]: [A@120000/180000], 2
[KTABLE-TOSTREAM-0000000014]: [B@120000/180000], 2
由于 GracePeriod 设置为 1 分钟,因此当输入流中的值更新为 1 时,windows [A@0/60000] 和 [B@0/60000] 的计数不会递增对于相同的密钥和时间戳。输出如预期所示。
但是当我重新启动我的流应用程序并再次摄取相同的输入流时,我看到了以下输出:
[KTABLE-TOSTREAM-0000000014]: [A@0/60000], 2
[KTABLE-TOSTREAM-0000000014]: [B@0/60000], 2
[KTABLE-TOSTREAM-0000000014]: [A@60000/120000], 2
[KTABLE-TOSTREAM-0000000014]: [B@60000/120000], 2
[KTABLE-TOSTREAM-0000000014]: [A@120000/180000], 2
[KTABLE-TOSTREAM-0000000014]: [B@120000/180000], 2
[KTABLE-TOSTREAM-0000000014]: [A@60000/120000], 3
[KTABLE-TOSTREAM-0000000014]: [B@60000/120000], 3
[KTABLE-TOSTREAM-0000000014]: [A@120000/180000], 3
[KTABLE-TOSTREAM-0000000014]: [B@120000/180000], 3
为什么window[A@0/60000]和[B@0/60000]在重启后更新为2? 在重新启动应用程序之前,streamTime 为 121000,并且 window [A@0/60000] 和 [B@0/60000] 已经超过宽限期并关闭。 为什么这个window重启后才考虑?
这是一个已知问题:https://issues.apache.org/jira/browse/KAFKA-9368
在重新平衡期间,aggregation()
运算符 "forgets" 当前时间,并且在重新启动时仅 "re-learns" 它看到的新记录的时基。这会影响宽限期的应用方式。