使用 Kafka Streams DSL 多次使用相同的主题作为源
Use the same topic as a source more than once with Kafka Streams DSL
在使用 Kafka Streams DSL 时,是否可以使用相同的主题作为两个不同处理例程的源?
StreamsBuilder streamsBuilder = new StreamsBuilder();
// use the topic as a stream
streamsBuilder.stream("topic")...
// use the same topic as a source for KTable
streamsBuilder.table("topic")...
return streamsBuilder.build();
上面的简单实现在运行时抛出一个TopologyException
:无效的拓扑:主题主题已经被另一个来源注册。这是完全有效的,如果我们深入进入底层处理器 API。使用它是唯一的出路吗?
更新:
到目前为止我找到的最接近的替代方案:
StreamsBuilder streamsBuilder = new StreamsBuilder();
final KStream<Object, Object> stream = streamsBuilder.stream("topic");
// use the topic as a stream
stream...
// create a KTable from the KStream
stream.groupByKey().reduce((oldValue, newValue) -> newValue)...
return streamsBuilder.build();
是的,你可以,但为此你需要多个 StreamsBuilder
StreamsBuilder streamsBuilder1 = new StreamsBuilder();
streamsBuilder1.stream("topic");
StreamsBuilder streamsBuilder2 = new StreamsBuilder();
streamsBuilder2.table("topic");
Topology topology1 = streamsBuilder1.build();
Topology topology2 = streamsBuilder2.build();
KafkaStreams kafkaStreams1 = new KafkaStreams(topology1, streamsConfig1);
KafkaStreams kafkaStreams2 = new KafkaStreams(topology2, streamsConfig2);
还要确保每个 StreamsConfig
都有不同的 application.id
值
阅读与流和 table 相同的主题在语义上有问题恕我直言。 Streams 模型 immutable 事实,而您将用于读入 KTable 模型更新的更新日志主题。
如果您想在多个流中使用同一个主题,您可以多次重复使用同一个 KStream
对象(在语义上类似于广播):
KStream stream = ...
stream.filter();
stream.map();
也比较:https://issues.apache.org/jira/browse/KAFKA-6687(有计划取消这个限制。我怀疑,我们将允许同时使用一个主题作为 KStream
和 KTable
——比较我上面的评论)。
在使用 Kafka Streams DSL 时,是否可以使用相同的主题作为两个不同处理例程的源?
StreamsBuilder streamsBuilder = new StreamsBuilder();
// use the topic as a stream
streamsBuilder.stream("topic")...
// use the same topic as a source for KTable
streamsBuilder.table("topic")...
return streamsBuilder.build();
上面的简单实现在运行时抛出一个TopologyException
:无效的拓扑:主题主题已经被另一个来源注册。这是完全有效的,如果我们深入进入底层处理器 API。使用它是唯一的出路吗?
更新: 到目前为止我找到的最接近的替代方案:
StreamsBuilder streamsBuilder = new StreamsBuilder();
final KStream<Object, Object> stream = streamsBuilder.stream("topic");
// use the topic as a stream
stream...
// create a KTable from the KStream
stream.groupByKey().reduce((oldValue, newValue) -> newValue)...
return streamsBuilder.build();
是的,你可以,但为此你需要多个 StreamsBuilder
StreamsBuilder streamsBuilder1 = new StreamsBuilder();
streamsBuilder1.stream("topic");
StreamsBuilder streamsBuilder2 = new StreamsBuilder();
streamsBuilder2.table("topic");
Topology topology1 = streamsBuilder1.build();
Topology topology2 = streamsBuilder2.build();
KafkaStreams kafkaStreams1 = new KafkaStreams(topology1, streamsConfig1);
KafkaStreams kafkaStreams2 = new KafkaStreams(topology2, streamsConfig2);
还要确保每个 StreamsConfig
application.id
值
阅读与流和 table 相同的主题在语义上有问题恕我直言。 Streams 模型 immutable 事实,而您将用于读入 KTable 模型更新的更新日志主题。
如果您想在多个流中使用同一个主题,您可以多次重复使用同一个 KStream
对象(在语义上类似于广播):
KStream stream = ...
stream.filter();
stream.map();
也比较:https://issues.apache.org/jira/browse/KAFKA-6687(有计划取消这个限制。我怀疑,我们将允许同时使用一个主题作为 KStream
和 KTable
——比较我上面的评论)。