Kafka Stream API:KStream 到 KGroupedStream 到 KTable 到 KStream

Kafka Stream API: KStream to KGroupedStream to KTable to KStream

我有一个 Kafka 读数流,我检查是否超过了某个阈值。我只想在第一次超过警报时传播警报。为此,我首先计算新状态,将新状态分组到 KGroupedStream 中。然后减少到一个 KTable,我在其中检查状态是否已更改(保留一个布尔值)并更改为 changelog 流并过滤状态更改的记录。

我的理论是这应该有效,但并不是每个状态更改都传播到变更日志流,而是偶尔更新一次变更日志流(无法真正看到模式)。任何人都知道为什么会这样,或者更好地解决这个问题?

简化示例:

KStream<String, String> inputStream = builder.stream("input");
KStream<String, String> outputStream = inputStream
  .groupByKey()
  .reduce((previousValue, newValue) -> newValue)
  .toStream();
outputStream.to("output");

在这种情况下,我希望每个新的传入值都将放在输出流中。然而,情况并非如此,只是偶尔将一个值放在输出流上。

我猜您启用了缓存缓冲(默认情况下)。 尝试在您的 Streams 配置中配置以下属性。

props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);