没有输入主题的拓扑将不会创建流线程和全局线程
Topology with no input topics will create no stream threads and no global thread
我正在编写一个 Kafka Streams 应用程序,我想在此应用程序中包含两个应用程序 ID,但我一直收到错误提示“没有输入主题的拓扑将不创建流线程和全局线程,必须订阅至少一个源主题或全球 table。”你能告诉我我在哪里犯了错误吗?非常感谢!
public class KafkaStreamsConfigurations {
...
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
@Primary
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
setDefaults(props);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "default");
return new KafkaStreamsConfiguration(props);
}
public void setDefaults(Map<String, Object> props) {...}
@Bean("snowplowStreamBuilder")
public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
Map<String, Object> props = new HashMap<>();
setDefaults(props);
...
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
Properties properties = new Properties();
props.forEach(properties::put);
StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean();
streamsBuilderFactoryBean.setStreamsConfiguration(properties);
return streamsBuilderFactoryBean;
}
}
这是我的申请 class。
public class SnowplowStreamsApp {
@Bean("snowplowStreamsApp")
public KStream<String, String> [] startProcessing(
@Qualifier("snowplowStreamBuilder") StreamsBuilder builder) {
KStream<String, String>[] branches = builder.stream(inputTopicPubsubSnowplow, Consumed
.with(Serdes.String(), Serdes.String()))
.mapValues(snowplowEnrichedGoodDataFormatter::formatEnrichedData)
.branch(...);
return branches;
}
}
将您的工厂 bean 命名为 DEFAULT_STREAMS_BUILDER_BEAN_NAME
而不是 snowplowStreamBuilder
- 否则,默认工厂 bean 将在没有定义流的情况下启动。
我正在编写一个 Kafka Streams 应用程序,我想在此应用程序中包含两个应用程序 ID,但我一直收到错误提示“没有输入主题的拓扑将不创建流线程和全局线程,必须订阅至少一个源主题或全球 table。”你能告诉我我在哪里犯了错误吗?非常感谢!
public class KafkaStreamsConfigurations {
...
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
@Primary
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
setDefaults(props);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "default");
return new KafkaStreamsConfiguration(props);
}
public void setDefaults(Map<String, Object> props) {...}
@Bean("snowplowStreamBuilder")
public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
Map<String, Object> props = new HashMap<>();
setDefaults(props);
...
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
Properties properties = new Properties();
props.forEach(properties::put);
StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean();
streamsBuilderFactoryBean.setStreamsConfiguration(properties);
return streamsBuilderFactoryBean;
}
}
这是我的申请 class。
public class SnowplowStreamsApp {
@Bean("snowplowStreamsApp")
public KStream<String, String> [] startProcessing(
@Qualifier("snowplowStreamBuilder") StreamsBuilder builder) {
KStream<String, String>[] branches = builder.stream(inputTopicPubsubSnowplow, Consumed
.with(Serdes.String(), Serdes.String()))
.mapValues(snowplowEnrichedGoodDataFormatter::formatEnrichedData)
.branch(...);
return branches;
}
}
将您的工厂 bean 命名为 DEFAULT_STREAMS_BUILDER_BEAN_NAME
而不是 snowplowStreamBuilder
- 否则,默认工厂 bean 将在没有定义流的情况下启动。