使用 Kafka Streams DSL 多次使用相同的主题作为源

Use the same topic as a source more than once with Kafka Streams DSL

在使用 Kafka Streams DSL 时,是否可以使用相同的主题作为两个不同处理例程的源?

StreamsBuilder streamsBuilder = new StreamsBuilder();

// use the topic as a stream
streamsBuilder.stream("topic")...

// use the same topic as a source for KTable
streamsBuilder.table("topic")...

return streamsBuilder.build();

上面的简单实现在运行时抛出一个TopologyException无效的拓扑:主题主题已经被另一个来源注册。这是完全有效的,如果我们深入进入底层处理器 API。使用它是唯一的出路吗?

更新: 到目前为止我找到的最接近的替代方案:

StreamsBuilder streamsBuilder = new StreamsBuilder();

final KStream<Object, Object> stream = streamsBuilder.stream("topic");

// use the topic as a stream
stream...

// create a KTable from the KStream
stream.groupByKey().reduce((oldValue, newValue) -> newValue)...

return streamsBuilder.build();

是的,你可以,但为此你需要多个 StreamsBuilder

StreamsBuilder streamsBuilder1 = new StreamsBuilder();
streamsBuilder1.stream("topic");

StreamsBuilder streamsBuilder2 = new StreamsBuilder();
streamsBuilder2.table("topic");

Topology topology1 = streamsBuilder1.build();
Topology topology2 = streamsBuilder2.build();

KafkaStreams kafkaStreams1 = new KafkaStreams(topology1, streamsConfig1);
KafkaStreams kafkaStreams2 = new KafkaStreams(topology2, streamsConfig2);

还要确保每个 StreamsConfig

都有不同的 application.id

阅读与流和 table 相同的主题在语义上有问题恕我直言。 Streams 模型 immutable 事实,而您将用于读入 KTable 模型更新的更新日志主题。

如果您想在多个流中使用同一个主题,您可以多次重复使用同一个 KStream 对象(在语义上类似于广播):

KStream stream = ...
stream.filter();
stream.map();

也比较:https://issues.apache.org/jira/browse/KAFKA-6687(有计划取消这个限制。我怀疑,我们将允许同时使用一个主题作为 KStreamKTable——比较我上面的评论)。