如何使用 Kafka Stream DSL 通过处理器过滤键和值

How to filter keys and value with a Processor using Kafka Stream DSL

我有一个与 StateStore 交互的处理器来过滤消息并对消息执行复杂的逻辑。在 process(key,value) 方法中,我使用 context.forward(key,value) 发送我需要的键和值。出于调试目的,我还打印了那些。

我有一个 KStream mergedStream,它是由其他两个流的连接产生的。我想将处理器应用于该流的记录。我通过以下方式实现了这一目标:mergedStream.process(myprocessor,"stateStoreName")

当我启动这个程序时,我可以看到正确的值被打印到我的控制台。但是,如果我使用 mergedStream.to("topic") 将 mergedStream 发送到主题,则主题上的值不是我在处理器中转发的值,而是原始值。

我用的是kafka-streams 0.10.1.0.

将我在处理器中转发的值获取到另一个流的最佳方法是什么?

是否可以混用Processor API with the streams created by the KStream DSL

短:

要解决您的问题,您可以使用 transform(...) 而不是 process(...),这样您也可以在 DSL 中访问处理器 API。

:

如果您使用 process(...),则将处理器应用于流——但是,这是一个 "terminating"(或接收器)操作(它的 return 类型是 void),也就是说,它不会 return 任何结果(这里 "sink" 仅表示运算符没有后继——它并不意味着任何结果都写在某处!)

此外,如果您调用 mergedStream.process(...)mergedStream.to(...),您基本上会分支并复制您的流,并将一份副本发送给每个下游运营商(即,一份副本给 process 和一份到 to.

混合使用 DSL 和处理器 API 是完全可能的(您已经做到了 ;))。但是,使用 process(...) 你不能在 DSL 中使用你 forward(...) 的数据——如果你想使用处理器 API 结果,你可以使用 transform(...) 而不是 process(...) .