Kafka Streams DSL 处理方法是如何工作的?
How does the Kafka Streams DSL process method work?
我一直在使用 Kafka Streams 并获得了一些基本功能,但我在思考 Kafka Streams DSL 中的 process
方法时遇到了一些问题。具体来说:
我知道有两种使用 Kafka Streams 的方式,较低级别的 Processing API 和较高级别的 Streams DSL。在较低级别,您可以更明确地定义拓扑、命名每个节点等,而 Streams DSL 将大部分内容抽象掉。
然而,更高级别的 Streams DSL 有一个名为 process()
的方法,它是一个终端操作(即方法 returns void
)。所以我的问题是,处理后的数据 - Processor
的 void forward(key, value)
方法发送的数据 - 去了哪里?
在较低级别的处理 API 中,您将处理器命名为节点,并且可以 link Sink
使用该名称,但在 Streams DSL 中没有名称,或者至少none 我能找到。
这是可能的,但是 "clumsy" 到 forward()
数据来自 DSL 中的 Processor
。如您所述,process()
方法被定义为终端操作,因此您不应调用 forward()
。如果您仍然调用 forward()
并且没有分配下游处理器,则 forward()
基本上是空操作。
但是,与其尝试为 process()
添加下游处理器(这会是一种 hack),您应该使用 transform()
(以及相关方法,transformValues()
,flatTransform()
, 和 flatTransformValues()
) 而不是 ff 你想向下游发送数据。
我一直在使用 Kafka Streams 并获得了一些基本功能,但我在思考 Kafka Streams DSL 中的 process
方法时遇到了一些问题。具体来说:
我知道有两种使用 Kafka Streams 的方式,较低级别的 Processing API 和较高级别的 Streams DSL。在较低级别,您可以更明确地定义拓扑、命名每个节点等,而 Streams DSL 将大部分内容抽象掉。
然而,更高级别的 Streams DSL 有一个名为 process()
的方法,它是一个终端操作(即方法 returns void
)。所以我的问题是,处理后的数据 - Processor
的 void forward(key, value)
方法发送的数据 - 去了哪里?
在较低级别的处理 API 中,您将处理器命名为节点,并且可以 link Sink
使用该名称,但在 Streams DSL 中没有名称,或者至少none 我能找到。
这是可能的,但是 "clumsy" 到 forward()
数据来自 DSL 中的 Processor
。如您所述,process()
方法被定义为终端操作,因此您不应调用 forward()
。如果您仍然调用 forward()
并且没有分配下游处理器,则 forward()
基本上是空操作。
但是,与其尝试为 process()
添加下游处理器(这会是一种 hack),您应该使用 transform()
(以及相关方法,transformValues()
,flatTransform()
, 和 flatTransformValues()
) 而不是 ff 你想向下游发送数据。