Kafka Stream:应用程序重启时的 Kafka Windowed Stream 行为

Kafka Stream: Kafka Windowed Stream behavior on application restart

处理简单的数据流(比如 inputStream),例如: 请注意,针对相同的键和时间戳更新了值。

|-----|--------|-------|
| Key | TS(ms) | Value |
|-----|--------|-------|
|  A  | 1000   |   0   |
|  B  | 1000   |   0   |
|  A  | 61000  |   0   |
|  B  | 61000  |   0   |
|  A  | 121000 |   0   |
|  B  | 121000 |   0   |
|  A  | 1000   |   1   |
|  B  | 1000   |   1   |
|  A  | 61000  |   1   |
|  B  | 61000  |   1   |
|  A  | 121000 |   1   |
|  B  | 121000 |   1   |

代码如下:

 KStream<Windowed<String>, Long> aggregatedStream = inputStream
            .groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(1)))
            .count(Materialized.as("count-metric"))
            .toStream();

    aggregatedStream.print(Printed.toSysOut());

打印的输出是

[KTABLE-TOSTREAM-0000000014]: [A@0/60000], 1
[KTABLE-TOSTREAM-0000000014]: [B@0/60000], 1
[KTABLE-TOSTREAM-0000000014]: [A@60000/120000], 1
[KTABLE-TOSTREAM-0000000014]: [B@60000/120000], 1
[KTABLE-TOSTREAM-0000000014]: [A@120000/180000], 1
[KTABLE-TOSTREAM-0000000014]: [B@120000/180000], 1
[KTABLE-TOSTREAM-0000000014]: [A@60000/120000], 2
[KTABLE-TOSTREAM-0000000014]: [B@60000/120000], 2
[KTABLE-TOSTREAM-0000000014]: [A@120000/180000], 2
[KTABLE-TOSTREAM-0000000014]: [B@120000/180000], 2

由于 GracePeriod 设置为 1 分钟,因此当输入流中的值更新为 1 时,windows [A@0/60000] 和 [B@0/60000] 的计数不会递增对于相同的密钥和时间戳。输出如预期所示。

但是当我重新启动我的流应用程序并再次摄取相同的输入流时,我看到了以下输出:

[KTABLE-TOSTREAM-0000000014]: [A@0/60000], 2
[KTABLE-TOSTREAM-0000000014]: [B@0/60000], 2
[KTABLE-TOSTREAM-0000000014]: [A@60000/120000], 2
[KTABLE-TOSTREAM-0000000014]: [B@60000/120000], 2
[KTABLE-TOSTREAM-0000000014]: [A@120000/180000], 2
[KTABLE-TOSTREAM-0000000014]: [B@120000/180000], 2
[KTABLE-TOSTREAM-0000000014]: [A@60000/120000], 3
[KTABLE-TOSTREAM-0000000014]: [B@60000/120000], 3
[KTABLE-TOSTREAM-0000000014]: [A@120000/180000], 3
[KTABLE-TOSTREAM-0000000014]: [B@120000/180000], 3

为什么window[A@0/60000]和[B@0/60000]在重启后更新为2? 在重新启动应用程序之前,streamTime 为 121000,并且 window [A@0/60000] 和 [B@0/60000] 已经超过宽限期并关闭。 为什么这个window重启后才考虑?

这是一个已知问题:https://issues.apache.org/jira/browse/KAFKA-9368

在重新平衡期间,aggregation() 运算符 "forgets" 当前时间,并且在重新启动时仅 "re-learns" 它看到的新记录的时基。这会影响宽限期的应用方式。