在修改主题时重置 Spring 引导 Kafka 流应用程序
Reset Spring Boot Kafka Stream Application on modifying topics
我在使用 StreamsBuilderFactoryBean
的 Spring 引导应用程序中使用 spring-kafka
到 运行 Kafka Stream。我通过删除和重新创建它们将一些主题中的分区数从 100 更改为 20,但现在在 运行 应用程序中,我收到以下错误:
Existing internal topic MyAppId-KSTREAM-AGGREGATE-STATE-STORE-0000000092-changelog has invalid partitions: expected: 20; actual: 100. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.
我无法访问 class kafka.tools.StreamsResetter
并尝试调用 StreamsBuilderFactoryBean.getKafkaStreams.cleanup()
但它给了 NullPointerException
。我该如何进行上述清理工作?
相关文档位于 here。
第 1 步:本地清理
对于Spring Boot with StreamsBuilderFactoryBean
,第一步只需在构造函数中添加CleanerConfig
即可:
// Before
new StreamsBuilderFactoryBean(new KafkaStreamsConfiguration(config));
// After
new StreamsBuilderFactoryBean(new KafkaStreamsConfiguration(config), new CleanupConfig(true, true));
这可以在 start()
之前和 stop()
之后调用 KafkaStreams.cleanUp()
方法。
第 2 步:全局清理
对于第二步,在停止所有应用程序实例的情况下,只需按照文档中的说明使用该工具:
# In kafka directory
bin/kafka-streams-application-reset.sh --application-id "MyAppId" --bootstrap-servers 1.2.3.4:9092 --input-topics x --intermediate-topics first_x,second_x,third_x --zookeeper 1.2.3.4:2181
这是做什么的:
For any specified input topics: Reset the application’s committed consumer offsets to "beginning of the topic" for all partitions (for consumer group application.id).
For any specified intermediate topics: Skip to the end of the topic, i.e. set the application’s committed consumer offsets for all partitions to each partition’s logSize (for consumer group application.id).
For any internal topics: Delete the internal topic (this will also delete committed the corresponding committed offsets).
我在使用 StreamsBuilderFactoryBean
的 Spring 引导应用程序中使用 spring-kafka
到 运行 Kafka Stream。我通过删除和重新创建它们将一些主题中的分区数从 100 更改为 20,但现在在 运行 应用程序中,我收到以下错误:
Existing internal topic MyAppId-KSTREAM-AGGREGATE-STATE-STORE-0000000092-changelog has invalid partitions: expected: 20; actual: 100. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.
我无法访问 class kafka.tools.StreamsResetter
并尝试调用 StreamsBuilderFactoryBean.getKafkaStreams.cleanup()
但它给了 NullPointerException
。我该如何进行上述清理工作?
相关文档位于 here。
第 1 步:本地清理
对于Spring Boot with StreamsBuilderFactoryBean
,第一步只需在构造函数中添加CleanerConfig
即可:
// Before
new StreamsBuilderFactoryBean(new KafkaStreamsConfiguration(config));
// After
new StreamsBuilderFactoryBean(new KafkaStreamsConfiguration(config), new CleanupConfig(true, true));
这可以在 start()
之前和 stop()
之后调用 KafkaStreams.cleanUp()
方法。
第 2 步:全局清理
对于第二步,在停止所有应用程序实例的情况下,只需按照文档中的说明使用该工具:
# In kafka directory
bin/kafka-streams-application-reset.sh --application-id "MyAppId" --bootstrap-servers 1.2.3.4:9092 --input-topics x --intermediate-topics first_x,second_x,third_x --zookeeper 1.2.3.4:2181
这是做什么的:
For any specified input topics: Reset the application’s committed consumer offsets to "beginning of the topic" for all partitions (for consumer group application.id).
For any specified intermediate topics: Skip to the end of the topic, i.e. set the application’s committed consumer offsets for all partitions to each partition’s logSize (for consumer group application.id).
For any internal topics: Delete the internal topic (this will also delete committed the corresponding committed offsets).