Apache Kafka (KStreams):如何订阅多个主题?
Apache Kafka (KStreams) : How to subscribe to multiple topics?
我有以下代码
//Kafka Config setup
Properties props = ...; //setup
List<String> topicList = Arrays.asList({"A", "B", "C"});
StreamBuilder builder = new StreamBuilder();
KStream<String, String> source = builder.stream(topicList);
source
.map((k,v) -> { //busy code for mapping data})
.transformValues(new MyGenericTransformer());
.to((k,v,r) -> {//busy code for topic routing});
new KafkaStream(builder.build(), properties).start();
我的问题: 当我添加多个主题订阅时(即上面的 A、B、C),Kstream 代码停止接收记录。
参考文献: https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/StreamsBuilder.html
相关文档
public <K,V> KStream<K,V> stream(java.util.Collection<java.lang.String> topics)
"If multiple topics are specified there is no ordering guarantee for records from different topics."
我想要实现的目标: 有一个来自多个主题的 Kstream(即上面的 'source')consume/process。
主题是否共享相同的密钥?
Note that the specified input topics must be partitioned by key. If
this is not the case it is the user's responsibility to repartition
the data before any key based operation (like aggregation or join) is
applied to the returned KStream.
这可能是你的障碍。
另一个可能的问题可能是使用的消费者组。
我有以下代码
//Kafka Config setup
Properties props = ...; //setup
List<String> topicList = Arrays.asList({"A", "B", "C"});
StreamBuilder builder = new StreamBuilder();
KStream<String, String> source = builder.stream(topicList);
source
.map((k,v) -> { //busy code for mapping data})
.transformValues(new MyGenericTransformer());
.to((k,v,r) -> {//busy code for topic routing});
new KafkaStream(builder.build(), properties).start();
我的问题: 当我添加多个主题订阅时(即上面的 A、B、C),Kstream 代码停止接收记录。
参考文献: https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/StreamsBuilder.html
相关文档
public <K,V> KStream<K,V> stream(java.util.Collection<java.lang.String> topics)
"If multiple topics are specified there is no ordering guarantee for records from different topics."
我想要实现的目标: 有一个来自多个主题的 Kstream(即上面的 'source')consume/process。
主题是否共享相同的密钥?
Note that the specified input topics must be partitioned by key. If this is not the case it is the user's responsibility to repartition the data before any key based operation (like aggregation or join) is applied to the returned KStream.
这可能是你的障碍。
另一个可能的问题可能是使用的消费者组。