Java 的 Kafka Stream:发送到多个主题
Kafka Stream with Java: Send TO multiple topics
我正在尝试使用 Java 使用 Kafka Streams 将我的消息输出到多个主题,例如:
myStream.to(
(key, message, recordContext) -> {
return getMessageTargetTopics(message);
},
Produced.with(Serdes.String(), jsonSerde)
);
问题是 getMessageTargetTopics
returns 主题列表和方法 to
不接受列表...
我看到已经有针对此的 issue,但我想知道同时是否有任何可能的解决方案。有什么想法吗?
非常感谢!
您可以使用KStream:flatMapValues(...)
来计算每个值的主题。主题名称应该用值包装,然后再打开。
注意事项:
这种方法有一个缺点 - 您的消息的值将包含主题名称,因此使用这些消息的应用程序具有提取商业价值
示例代码如下所示:
myStream.flatMapValues(value -> getMessageTargetTopicsWrappedWithValue(value))
.to((key, value, recordContext) -> value.getTopicName,
Produced.with(Serdes.String(), jsonSerde));
由于简单的解决方案似乎不起作用:您应该有一个列表,其中包含 getMessageTargetTopics 可能返回的所有主题。
解决方案 1 (Streams DSL,但并非如此 "clean"):使用分支来实现该逻辑 select 主题。
解决方案 2(IHMO 更干净):将所有主题添加为接收器并构建一个自定义处理器,转发到相应的接收器node(s) - 像 context.forward(key, value, To.child("topic")) 循环主题列表应该做到这一点。
我正在尝试使用 Java 使用 Kafka Streams 将我的消息输出到多个主题,例如:
myStream.to(
(key, message, recordContext) -> {
return getMessageTargetTopics(message);
},
Produced.with(Serdes.String(), jsonSerde)
);
问题是 getMessageTargetTopics
returns 主题列表和方法 to
不接受列表...
我看到已经有针对此的 issue,但我想知道同时是否有任何可能的解决方案。有什么想法吗?
非常感谢!
您可以使用KStream:flatMapValues(...)
来计算每个值的主题。主题名称应该用值包装,然后再打开。
注意事项: 这种方法有一个缺点 - 您的消息的值将包含主题名称,因此使用这些消息的应用程序具有提取商业价值
示例代码如下所示:
myStream.flatMapValues(value -> getMessageTargetTopicsWrappedWithValue(value))
.to((key, value, recordContext) -> value.getTopicName,
Produced.with(Serdes.String(), jsonSerde));
由于简单的解决方案似乎不起作用:您应该有一个列表,其中包含 getMessageTargetTopics 可能返回的所有主题。
解决方案 1 (Streams DSL,但并非如此 "clean"):使用分支来实现该逻辑 select 主题。
解决方案 2(IHMO 更干净):将所有主题添加为接收器并构建一个自定义处理器,转发到相应的接收器node(s) - 像 context.forward(key, value, To.child("topic")) 循环主题列表应该做到这一点。