Kafka 侦听器在 spring Spring Kafka 2.2 的引导应用程序启动之前变得活跃。3.RELEASE

Kafka Listeners are getting active before spring boot application startup for Spring Kafka 2.2.3.RELEASE

我们在 spring 启动应用程序(Spring Boot 2.1. 2.RELEASE) 我们在应用程序中定义了一些 kafka 消费者,

@Component
@RefreshScope
@ConditionalOnProperty(prefix = "ord", value = "order-event.messaging.consumer.enabled", havingValue = "true")
public class OrderEventListener {

    @KafkaListener(id = "orderEventListener", topics = "#{'${order.consumer.topic}'}")
    public void consumeMessageEvent(OrderEvent messageEvent,ConsumerRecord<String, ?> record) { 

    // do some further processing.

    } 

}

现在让我们跳到问题,

  1. 所以当我启动应用程序时,在应用程序本身启动时,它会激活这个消费者并尝试连接到代理,但是代理没有启动并且 运行 在我的系统中并且在超时之后,它会抛出错误

    Error creating bean with name 'scopedTarget.orderEventListener'

    并显示 'Application run failed' 并终止进程。

    为什么需要broker up并且运行在启动的时候 申请?

  2. 以前我们使用的是 Spring Kafka 1.1.8.RELEASE,它过去常常在不启动所使用的代理和应用程序的情况下工作才能正常启动。

    这是预期的行为吗?所以行为从版本 1.1.8 更改为 2.2.3 ?

额外的日志,

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.orderEventListener' Initialization of bean failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:584) ~[spring-beans-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:498) ~[spring-beans-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean(AbstractBeanFactory.java:356) ~[spring-beans-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.cloud.context.scope.GenericScope$BeanLifecycleWrapper.getBean(GenericScope.java:390) ~[spring-cloud-context-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.cloud.context.scope.GenericScope.get(GenericScope.java:184) ~[spring-cloud-context-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:353) ~[spring-beans-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199) ~[spring-beans-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:1083) ~[spring-context-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.cloud.context.scope.refresh.RefreshScope.eagerlyInitialize(RefreshScope.java:130) ~[spring-cloud-context-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.cloud.context.scope.refresh.RefreshScope.start(RefreshScope.java:121) ~[spring-cloud-context-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.cloud.context.scope.refresh.RefreshScope.onApplicationEvent(RefreshScope.java:115) ~[spring-cloud-context-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.cloud.context.scope.refresh.RefreshScope.onApplicationEvent(RefreshScope.java:71) ~[spring-cloud-context-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172) ~[spring-context-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165) ~[spring-context-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139) ~[spring-context-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:398) ~[spring-context-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:355) ~[spring-context-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:882) ~[spring-context-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:163) ~[spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549) ~[spring-context-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:142) ~[spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:316) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at oal.oracle.apps.ic.coll.ordsbx.Application.main(Application.java:59) [classes/:?]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_161]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_161]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_161]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161]
    at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) [spring-boot-devtools-2.1.2.RELEASE.jar:2.1.2.RELEASE]
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

这是由于底层 kafka-clients 库(自 2.0.0 起)发生了变化。

以前,它会永远阻塞,让应用程序完全无法控制,这对许多用户来说是不可接受的。

现在,如果代理不可用,操作将超时。

我不确定为什么它在 bean 创建过程中失败 - 它应该只发生在容器在应用程序生命周期的后期 start()ed 时。但我看到你正在使用 devtools,所以我不确定它是否加剧了问题 - 编辑问题以显示 complete 堆栈跟踪(你应该 never 在这里截断堆栈跟踪,因为我们看不到完整的故事)。

通常,这可以通过将 autoStartup 设置为 false 然后在循环中等待代理存在后自己启动容器来避免。