如何使用 Kafka Stream DSL 通过处理器过滤键和值
How to filter keys and value with a Processor using Kafka Stream DSL
我有一个与 StateStore 交互的处理器来过滤消息并对消息执行复杂的逻辑。在 process(key,value)
方法中,我使用 context.forward(key,value)
发送我需要的键和值。出于调试目的,我还打印了那些。
我有一个 KStream mergedStream
,它是由其他两个流的连接产生的。我想将处理器应用于该流的记录。我通过以下方式实现了这一目标:mergedStream.process(myprocessor,"stateStoreName")
当我启动这个程序时,我可以看到正确的值被打印到我的控制台。但是,如果我使用 mergedStream.to("topic")
将 mergedStream 发送到主题,则主题上的值不是我在处理器中转发的值,而是原始值。
我用的是kafka-streams 0.10.1.0.
将我在处理器中转发的值获取到另一个流的最佳方法是什么?
是否可以混用Processor API with the streams created by the KStream DSL?
短:
要解决您的问题,您可以使用 transform(...)
而不是 process(...)
,这样您也可以在 DSL 中访问处理器 API。
长:
如果您使用 process(...)
,则将处理器应用于流——但是,这是一个 "terminating"(或接收器)操作(它的 return 类型是 void
),也就是说,它不会 return 任何结果(这里 "sink" 仅表示运算符没有后继——它并不意味着任何结果都写在某处!)
此外,如果您调用 mergedStream.process(...)
和 mergedStream.to(...)
,您基本上会分支并复制您的流,并将一份副本发送给每个下游运营商(即,一份副本给 process
和一份到 to
.
混合使用 DSL 和处理器 API 是完全可能的(您已经做到了 ;))。但是,使用 process(...)
你不能在 DSL 中使用你 forward(...)
的数据——如果你想使用处理器 API 结果,你可以使用 transform(...)
而不是 process(...)
.
我有一个与 StateStore 交互的处理器来过滤消息并对消息执行复杂的逻辑。在 process(key,value)
方法中,我使用 context.forward(key,value)
发送我需要的键和值。出于调试目的,我还打印了那些。
我有一个 KStream mergedStream
,它是由其他两个流的连接产生的。我想将处理器应用于该流的记录。我通过以下方式实现了这一目标:mergedStream.process(myprocessor,"stateStoreName")
当我启动这个程序时,我可以看到正确的值被打印到我的控制台。但是,如果我使用 mergedStream.to("topic")
将 mergedStream 发送到主题,则主题上的值不是我在处理器中转发的值,而是原始值。
我用的是kafka-streams 0.10.1.0.
将我在处理器中转发的值获取到另一个流的最佳方法是什么?
是否可以混用Processor API with the streams created by the KStream DSL?
短:
要解决您的问题,您可以使用 transform(...)
而不是 process(...)
,这样您也可以在 DSL 中访问处理器 API。
长:
如果您使用 process(...)
,则将处理器应用于流——但是,这是一个 "terminating"(或接收器)操作(它的 return 类型是 void
),也就是说,它不会 return 任何结果(这里 "sink" 仅表示运算符没有后继——它并不意味着任何结果都写在某处!)
此外,如果您调用 mergedStream.process(...)
和 mergedStream.to(...)
,您基本上会分支并复制您的流,并将一份副本发送给每个下游运营商(即,一份副本给 process
和一份到 to
.
混合使用 DSL 和处理器 API 是完全可能的(您已经做到了 ;))。但是,使用 process(...)
你不能在 DSL 中使用你 forward(...)
的数据——如果你想使用处理器 API 结果,你可以使用 transform(...)
而不是 process(...)
.