Kafka Streams DSL:如何关闭流 branching/grouping 创建的子流?

Kafka Streams DSL: How to close sub-streams created by stream branching/grouping?

我正在用 Scala 编写 Kafka Streams 应用程序,我担心潜在的内存泄漏/整体资源使用。

有没有办法向Kafka发信号给"close"由grouping/branching操作创建的特定子流并释放相关资源?

为了演示潜在问题,让我们考虑一个将订单状态更改事件推送到名为 "my-super-input-topic" 的 Kafka 主题的电子商务应用程序。每个订单由 OrderId 唯一标识,用作 Kafka 消息密钥。

假设我们需要计算每个订单的状态更新计数并将结果推送到 "my-super-output-topic" 主题。以下代码片段演示了如何在 Scala 中执行此操作:

// ...
val builder = new StreamsBuilder
val ktable = builder.stream("my-super-input-topic")
    .groupByKey
    .count

ktable.toStream.to("my-super-output-topic")
// ...

据我了解,.groupBy / .groupByKey 将源流分成 N 个子流(在我们的例子中,每个订单一个)。上面的代码没有指定任何保留 windows,因此即使给定的订单(子流)在几小时不活动后收到一个事件 - 它仍然会被正确处理并且更新将被推送到一个接收器主题,包含正确的汇总计数。

因此我得出结论,Kafka 在某种内部存储中维护有关每个子流的信息。

但是,订单的生命周期是有限的,并且在一段时间后订单完成,这意味着与该订单相关的子流将永远不会收到进一步的事件。但 Kafka 仍然将其视为有效并等待进一步的消息,并且随着越来越多的订单完成,越来越多的 "dead" 子流将被累积。如果 Kafka 至少投入一些资源来跟踪每个子流,"dead" 个子流可能会导致大量内存使用,即使这完全没有必要。

因此,一旦系统了解相关订单已完成,dispose/close 特定子流是合理的。

注意:这是一个虚构的用例,用于演示特定问题,而非真实任务。请不要建议在没有 Kafka Streams 的情况下实现它。

您的聚合将永远保留每个键的计数是正确的。然而,"sub-stream" 是基于每个分区的,因此,每个子流应该总是包含一些数据。

无法关闭部分拓扑。

如果您担心 KTable 的存储无限增长,您可以考虑 (1) 使用最终会清除旧数据的窗口存储,(2) 使用 aggregate() 而不是计数:默认情况下 aggregate() 只会计数,但如果订单已完成,UDF 可以 return null -- 这将删除订单的键值对从商店。 (3) 或者您考虑不使用 DSL,而是使用提供更多 control/flexibility 的处理器 API 来维护状态存储(您也可以考虑使用 "punctuations")。

您可能还对以下内容感兴趣:https://issues.apache.org/jira/browse/KAFKA-4212