Kafka Stream API:KStream 到 KGroupedStream 到 KTable 到 KStream
Kafka Stream API: KStream to KGroupedStream to KTable to KStream
我有一个 Kafka 读数流,我检查是否超过了某个阈值。我只想在第一次超过警报时传播警报。为此,我首先计算新状态,将新状态分组到 KGroupedStream 中。然后减少到一个 KTable,我在其中检查状态是否已更改(保留一个布尔值)并更改为 changelog 流并过滤状态更改的记录。
我的理论是这应该有效,但并不是每个状态更改都传播到变更日志流,而是偶尔更新一次变更日志流(无法真正看到模式)。任何人都知道为什么会这样,或者更好地解决这个问题?
简化示例:
KStream<String, String> inputStream = builder.stream("input");
KStream<String, String> outputStream = inputStream
.groupByKey()
.reduce((previousValue, newValue) -> newValue)
.toStream();
outputStream.to("output");
在这种情况下,我希望每个新的传入值都将放在输出流中。然而,情况并非如此,只是偶尔将一个值放在输出流上。
我猜您启用了缓存缓冲(默认情况下)。
尝试在您的 Streams 配置中配置以下属性。
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
我有一个 Kafka 读数流,我检查是否超过了某个阈值。我只想在第一次超过警报时传播警报。为此,我首先计算新状态,将新状态分组到 KGroupedStream 中。然后减少到一个 KTable,我在其中检查状态是否已更改(保留一个布尔值)并更改为 changelog 流并过滤状态更改的记录。
我的理论是这应该有效,但并不是每个状态更改都传播到变更日志流,而是偶尔更新一次变更日志流(无法真正看到模式)。任何人都知道为什么会这样,或者更好地解决这个问题?
简化示例:
KStream<String, String> inputStream = builder.stream("input");
KStream<String, String> outputStream = inputStream
.groupByKey()
.reduce((previousValue, newValue) -> newValue)
.toStream();
outputStream.to("output");
在这种情况下,我希望每个新的传入值都将放在输出流中。然而,情况并非如此,只是偶尔将一个值放在输出流上。
我猜您启用了缓存缓冲(默认情况下)。 尝试在您的 Streams 配置中配置以下属性。
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);