如何在 Spring Kafka Stream 中配置 UncaughtExceptionHandler
How to configure an UncaughtExceptionHandler in Spring Kafka Stream
我正在使用 spring-kafka
来实现使用 Spring Boot 1.5.16 的流应用程序。我们使用的spring-kafka
版本是1.3.8.RELEASE.
我正在寻找一种方法来关闭启动应用程序,以防出现错误终止与 Kafka Streams 关联的所有线程。我发现在 KafkaStreams
there is the possibility to register a handle for uncaught exceptions. The method is setGlobalStateRestoreListener
.
里面
我看到这个方法暴露在类型KStreamBuilderFactoryBean
的spring-kafka
里面。
我的问题如下。有没有一种简单的方法可以将 UncaughtExceptionHandler
注册为 bean 并让 Spring 正确注入工厂 bean 中?还是我应该自己创建 KStreamBuilderFactoryBean
并手动设置处理程序?
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_KSTREAM_BUILDER_BEAN_NAME)
public KStreamBuilderFactoryBean kStreamBuilderFactoryBean(StreamsConfig streamsConfig) {
final KStreamBuilderFactoryBean streamBuilderFactoryBean = new KStreamBuilderFactoryBean(
streamsConfig);
streamBuilderFactoryBean.setUncaughtExceptionHandler((threadInError, exception) -> {
// Something happens here
});
return streamBuilderFactoryBean;
}
非常感谢。
是的。在那个旧版本中,您必须通过适当的注入自己指定一个 KStreamBuilderFactoryBean
bean,并且正是 KafkaStreamsDefaultConfiguration.DEFAULT_KSTREAM_BUILDER_BEAN_NAME
.
在以后的版本中,我们已经有一个 StreamsBuilderFactoryBeanConfigurer
来保持自动配置 KStreamBuilderFactoryBean
,但我们可以根据需要修改它。
更新
您只需在您的应用程序上下文中将其创建为一个 bean,框架就会将其拾取并应用于 StreamsBuilderFactoryBean
:
@Bean
StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
return new StreamsBuilderFactoryBeanConfigurer() {
@Override
public void configure(StreamsBuilderFactoryBean factoryBean) {
factoryBean.setCloseTimeout(...);
}
@Override
public int getOrder() {
return Integer.MAX_VALUE;
}
};
}
我正在使用 spring-kafka
来实现使用 Spring Boot 1.5.16 的流应用程序。我们使用的spring-kafka
版本是1.3.8.RELEASE.
我正在寻找一种方法来关闭启动应用程序,以防出现错误终止与 Kafka Streams 关联的所有线程。我发现在 KafkaStreams
there is the possibility to register a handle for uncaught exceptions. The method is setGlobalStateRestoreListener
.
我看到这个方法暴露在类型KStreamBuilderFactoryBean
的spring-kafka
里面。
我的问题如下。有没有一种简单的方法可以将 UncaughtExceptionHandler
注册为 bean 并让 Spring 正确注入工厂 bean 中?还是我应该自己创建 KStreamBuilderFactoryBean
并手动设置处理程序?
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_KSTREAM_BUILDER_BEAN_NAME)
public KStreamBuilderFactoryBean kStreamBuilderFactoryBean(StreamsConfig streamsConfig) {
final KStreamBuilderFactoryBean streamBuilderFactoryBean = new KStreamBuilderFactoryBean(
streamsConfig);
streamBuilderFactoryBean.setUncaughtExceptionHandler((threadInError, exception) -> {
// Something happens here
});
return streamBuilderFactoryBean;
}
非常感谢。
是的。在那个旧版本中,您必须通过适当的注入自己指定一个 KStreamBuilderFactoryBean
bean,并且正是 KafkaStreamsDefaultConfiguration.DEFAULT_KSTREAM_BUILDER_BEAN_NAME
.
在以后的版本中,我们已经有一个 StreamsBuilderFactoryBeanConfigurer
来保持自动配置 KStreamBuilderFactoryBean
,但我们可以根据需要修改它。
更新
您只需在您的应用程序上下文中将其创建为一个 bean,框架就会将其拾取并应用于 StreamsBuilderFactoryBean
:
@Bean
StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
return new StreamsBuilderFactoryBeanConfigurer() {
@Override
public void configure(StreamsBuilderFactoryBean factoryBean) {
factoryBean.setCloseTimeout(...);
}
@Override
public int getOrder() {
return Integer.MAX_VALUE;
}
};
}