Kafka Streams DSL 处理方法是如何工作的?

How does the Kafka Streams DSL process method work?

我一直在使用 Kafka Streams 并获得了一些基本功能,但我在思考 Kafka Streams DSL 中的 process 方法时遇到了一些问题。具体来说:

我知道有两种使用 Kafka Streams 的方式,较低级别的 Processing API 和较高级别的 Streams DSL。在较低级别,您可以更明确地定义拓扑、命名每个节点等,而 Streams DSL 将大部分内容抽象掉。

然而,更高级别的 Streams DSL 有一个名为 process() 的方法,它是一个终端操作(即方法 returns void)。所以我的问题是,处理后的数据 - Processorvoid forward(key, value) 方法发送的数据 - 去了哪里?

在较低级别的处理 API 中,您将处理器命名为节点,并且可以 link Sink 使用该名称,但在 Streams DSL 中没有名称,或者至少none 我能找到。

这是可能的,但是 "clumsy" 到 forward() 数据来自 DSL 中的 Processor。如您所述,process() 方法被定义为终端操作,因此您不应调用 forward()。如果您仍然调用 forward() 并且没有分配下游处理器,则 forward() 基本上是空操作。

但是,与其尝试为 process() 添加下游处理器(这会是一种 hack),您应该使用 transform()(以及相关方法,transformValues()flatTransform(), 和 flatTransformValues()) 而不是 ff 你想向下游发送数据。