Kafka Stream 按时计数 window 未报告零值
Kafka Stream count on time window not reporting zero values
我正在使用 Kafka 流来计算过去 3 分钟内使用跳跃时间发生的事件数 window:
public class ViewCountAggregator {
void buildStream(KStreamBuilder builder) {
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
KStream<String, String> views = builder.stream(stringSerde, stringSerde, "streams-view-count-input");
KStream<String, Long> viewCount = views
.groupBy((key, value) -> value)
.count(TimeWindows.of(TimeUnit.MINUTES.toMillis(3)).advanceBy(TimeUnit.MINUTES.toMillis(1)))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), value));
viewCount.to(stringSerde, longSerde, "streams-view-count-output");
}
public static void main(String[] args) throws Exception {
// some not so important initialization code
...
}
}
当 运行 消费者将一些消息推送到输入主题时,它会随着时间的推移接收以下更新:
single 1
single 1
single 1
five 1
five 4
five 5
five 4
five 1
这几乎是正确的,但它从未收到更新:
single 0
five 0
没有它,我更新计数器的消费者将永远不会在较长时间内没有事件时将其设置回零。我希望使用的消息看起来像这样:
single 1
single 1
single 1
single 0
five 1
five 4
five 5
five 4
five 1
five 0
是否有一些我缺少的配置选项/参数可以帮助我实现这种行为?
Which is almost correct, but it never receives updates for:
首先,计算输出是正确的。
二、为什么正确:
如果您应用 windowed 聚合,则只会创建那些 windows 具有实际内容的内容(我熟悉的所有其他系统都会产生相同的输出)。因此,如果对于某个键,在超过 window 大小的时间段内没有数据,则没有 window 实例化,因此也根本没有计数。
如果没有内容则不实例化windows的原因很简单:处理器无法知道所有密钥。在您的示例中,您有两个键,但稍后可能会出现第三个键。您希望从一开始就获得 <thirdKey,0>
吗?此外,由于数据流本质上是无限的,因此密钥可能会消失并且永远不会重新出现。如果你记住所有看到的键,并且在没有消失的键的数据时发出 <key,0>
,你会永远发出 <key,0>
吗?
我不想说你的预期 result/semantics 没有意义。这只是您的一个非常具体的用例,并不普遍适用。因此,流处理器不实现它。
第三:你能做什么?
有多个选项:
- 您的消费者可以跟踪它确实看到了哪些键,并使用嵌入的记录时间戳确定键是否为 "missing",然后将此键的计数器设置为零(为此,它可能还有助于删除
map
步骤并保留密钥的 Windowed<K>
类型,以便消费者获得 window 记录所属的信息)
- 在您的 Stream 应用程序中添加一个有状态的
#transform()
步骤,它执行与 (1) 中所述相同的操作。为此,注册一个标点回拨可能会有所帮助。
方法 (2) 应该更容易跟踪密钥,因为您可以将状态存储附加到转换步骤,因此不需要处理下游消费者中的状态(和 failure/recovery) .
然而,这两种方法的棘手部分仍然是决定何时 缺少密钥,即,您要等多久才能生成 <key,0>
。请注意,数据可能会迟到(也就是无序),即使您确实发出 <key,0>
迟到的记录也可能会产生 <key,1>
消息 after 您的代码确实发出了一条 <key,0>
记录。但也许这对您的情况来说并不是真正的问题,因为您似乎只使用最新的 window。
最后但并非最不重要的一条评论:您似乎只使用了最新的计数,而较新的 windows 覆盖了下游消费者中较旧的 windows。因此,可能值得探索 "Interactive Queries" 直接进入 count
运算符的状态,而不是使用主题并更新其他状态。这可能允许您重新设计并显着简化下游应用程序。查看 docs and a very good blog post about Interactive Queries 了解更多详情。
我正在使用 Kafka 流来计算过去 3 分钟内使用跳跃时间发生的事件数 window:
public class ViewCountAggregator {
void buildStream(KStreamBuilder builder) {
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
KStream<String, String> views = builder.stream(stringSerde, stringSerde, "streams-view-count-input");
KStream<String, Long> viewCount = views
.groupBy((key, value) -> value)
.count(TimeWindows.of(TimeUnit.MINUTES.toMillis(3)).advanceBy(TimeUnit.MINUTES.toMillis(1)))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), value));
viewCount.to(stringSerde, longSerde, "streams-view-count-output");
}
public static void main(String[] args) throws Exception {
// some not so important initialization code
...
}
}
当 运行 消费者将一些消息推送到输入主题时,它会随着时间的推移接收以下更新:
single 1
single 1
single 1
five 1
five 4
five 5
five 4
five 1
这几乎是正确的,但它从未收到更新:
single 0
five 0
没有它,我更新计数器的消费者将永远不会在较长时间内没有事件时将其设置回零。我希望使用的消息看起来像这样:
single 1
single 1
single 1
single 0
five 1
five 4
five 5
five 4
five 1
five 0
是否有一些我缺少的配置选项/参数可以帮助我实现这种行为?
Which is almost correct, but it never receives updates for:
首先,计算输出是正确的。
二、为什么正确:
如果您应用 windowed 聚合,则只会创建那些 windows 具有实际内容的内容(我熟悉的所有其他系统都会产生相同的输出)。因此,如果对于某个键,在超过 window 大小的时间段内没有数据,则没有 window 实例化,因此也根本没有计数。
如果没有内容则不实例化windows的原因很简单:处理器无法知道所有密钥。在您的示例中,您有两个键,但稍后可能会出现第三个键。您希望从一开始就获得 <thirdKey,0>
吗?此外,由于数据流本质上是无限的,因此密钥可能会消失并且永远不会重新出现。如果你记住所有看到的键,并且在没有消失的键的数据时发出 <key,0>
,你会永远发出 <key,0>
吗?
我不想说你的预期 result/semantics 没有意义。这只是您的一个非常具体的用例,并不普遍适用。因此,流处理器不实现它。
第三:你能做什么?
有多个选项:
- 您的消费者可以跟踪它确实看到了哪些键,并使用嵌入的记录时间戳确定键是否为 "missing",然后将此键的计数器设置为零(为此,它可能还有助于删除
map
步骤并保留密钥的Windowed<K>
类型,以便消费者获得 window 记录所属的信息) - 在您的 Stream 应用程序中添加一个有状态的
#transform()
步骤,它执行与 (1) 中所述相同的操作。为此,注册一个标点回拨可能会有所帮助。
方法 (2) 应该更容易跟踪密钥,因为您可以将状态存储附加到转换步骤,因此不需要处理下游消费者中的状态(和 failure/recovery) .
然而,这两种方法的棘手部分仍然是决定何时 缺少密钥,即,您要等多久才能生成 <key,0>
。请注意,数据可能会迟到(也就是无序),即使您确实发出 <key,0>
迟到的记录也可能会产生 <key,1>
消息 after 您的代码确实发出了一条 <key,0>
记录。但也许这对您的情况来说并不是真正的问题,因为您似乎只使用最新的 window。
最后但并非最不重要的一条评论:您似乎只使用了最新的计数,而较新的 windows 覆盖了下游消费者中较旧的 windows。因此,可能值得探索 "Interactive Queries" 直接进入 count
运算符的状态,而不是使用主题并更新其他状态。这可能允许您重新设计并显着简化下游应用程序。查看 docs and a very good blog post about Interactive Queries 了解更多详情。