如何在 Spring Kafka Stream 中配置 UncaughtExceptionHandler

How to configure an UncaughtExceptionHandler in Spring Kafka Stream

我正在使用 spring-kafka 来实现使用 Spring Boot 1.5.16 的流应用程序。我们使用的spring-kafka版本是1.3.8.RELEASE.

我正在寻找一种方法来关闭启动应用程序,以防出现错误终止与 Kafka Streams 关联的所有线程。我发现在 KafkaStreams there is the possibility to register a handle for uncaught exceptions. The method is setGlobalStateRestoreListener.

里面

我看到这个方法暴露在类型KStreamBuilderFactoryBeanspring-kafka里面。

我的问题如下。有没有一种简单的方法可以将 UncaughtExceptionHandler 注册为 bean 并让 Spring 正确注入工厂 bean 中?还是我应该自己创建 KStreamBuilderFactoryBean 并手动设置处理程序?

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_KSTREAM_BUILDER_BEAN_NAME)
public KStreamBuilderFactoryBean kStreamBuilderFactoryBean(StreamsConfig streamsConfig) {
    final KStreamBuilderFactoryBean streamBuilderFactoryBean = new KStreamBuilderFactoryBean(
            streamsConfig);
    streamBuilderFactoryBean.setUncaughtExceptionHandler((threadInError, exception) -> {
        // Something happens here
    });
    return streamBuilderFactoryBean;
}

非常感谢。

是的。在那个旧版本中,您必须通过适当的注入自己指定一个 KStreamBuilderFactoryBean bean,并且正是 KafkaStreamsDefaultConfiguration.DEFAULT_KSTREAM_BUILDER_BEAN_NAME.

在以后的版本中,我们已经有一个 StreamsBuilderFactoryBeanConfigurer 来保持自动配置 KStreamBuilderFactoryBean,但我们可以根据需要修改它。

更新

您只需在您的应用程序上下文中将其创建为一个 bean,框架就会将其拾取并应用于 StreamsBuilderFactoryBean:

    @Bean
    StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
        return new StreamsBuilderFactoryBeanConfigurer() {

            @Override
            public void configure(StreamsBuilderFactoryBean factoryBean) {
                factoryBean.setCloseTimeout(...);
            }

            @Override
            public int getOrder() {
                return Integer.MAX_VALUE;
            }

        };
    }