在修改主题时重置 Spring 引导 Kafka 流应用程序

Reset Spring Boot Kafka Stream Application on modifying topics

我在使用 StreamsBuilderFactoryBean 的 Spring 引导应用程序中使用 spring-kafka 到 运行 Kafka Stream。我通过删除和重新创建它们将一些主题中的分区数从 100 更改为 20,但现在在 运行 应用程序中,我收到以下错误:

Existing internal topic MyAppId-KSTREAM-AGGREGATE-STATE-STORE-0000000092-changelog has invalid partitions: expected: 20; actual: 100. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.

我无法访问 class kafka.tools.StreamsResetter 并尝试调用 StreamsBuilderFactoryBean.getKafkaStreams.cleanup() 但它给了 NullPointerException。我该如何进行上述清理工作?

相关文档位于 here

第 1 步:本地清理

对于Spring Boot with StreamsBuilderFactoryBean,第一步只需在构造函数中添加CleanerConfig即可:

// Before
new StreamsBuilderFactoryBean(new KafkaStreamsConfiguration(config));
// After
new StreamsBuilderFactoryBean(new KafkaStreamsConfiguration(config), new CleanupConfig(true, true));

这可以在 start() 之前和 stop() 之后调用 KafkaStreams.cleanUp() 方法。

第 2 步:全局清理

对于第二步,在停止所有应用程序实例的情况下,只需按照文档中的说明使用该工具:

# In kafka directory
bin/kafka-streams-application-reset.sh --application-id "MyAppId" --bootstrap-servers 1.2.3.4:9092 --input-topics x --intermediate-topics first_x,second_x,third_x --zookeeper 1.2.3.4:2181

这是做什么的:

For any specified input topics: Reset the application’s committed consumer offsets to "beginning of the topic" for all partitions (for consumer group application.id).

For any specified intermediate topics: Skip to the end of the topic, i.e. set the application’s committed consumer offsets for all partitions to each partition’s logSize (for consumer group application.id).

For any internal topics: Delete the internal topic (this will also delete committed the corresponding committed offsets).