Spring 如果启动时没有代理可用,集成 Kafka 3.0.1 -> 3.1.2 测试现在无法启动

Spring Integration Kafka 3.0.1 -> 3.1.2 tests now fail to start if no broker available on startup

项目从Springboot 2.0升级到2.1时,我们也把spring-kafka-integration从3.0.1升级到3.2.1。在这样做的过程中,我们的测试现在都无法启动,原因是:

org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=x] Connection to node -1 could not be established. Broker may not be available.
org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=x] Connection to node -1 could not be established. Broker may not be available.
org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=x] Connection to node -1 could not be established. Broker may not be available.
...
org.springframework.context.ApplicationContextException: Failed to start bean 'eventInboundFlow.kafka:message-driven-channel-adapter#0'; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

我们的构建机器在本地没有 Kafka 运行ning,而使用 EmbeddedKafkaBroker 的测试使用自定义 JUnit5 扩展来实现 stops/starts 测试之间的容器侦听器(同时寻找最新的所有分区和主题,以防止测试意外泄漏消息以在以后的测试中破坏预期)。虽然比 @DirtiesContext 快得多,但它不会像 @EmbddedKafka 注释那样在配置期间将自身注入上下文。

在以前的版本中,这不是问题;在配置扩展并启动代理时,我们会看到一些关于无法连接的日志消息,但随后一切正常。

在新版本中,上下文无法完全启动(自定义扩展甚至没有机会 运行)。查看属性,我能看到的关于启动失败的唯一设置是 spring.kafka.admin.fail-fast,但默认情况下为 FALSE,我们不会更改它。

将其与启动项目本身作为 springboot 应用程序进行比较,我看到的第一个区别是容器在 运行 作为应用程序时在自己的线程中启动,但在 main / Test Worker线程时运行作为测试。在以前的版本中,测试也在自己的线程中启动容器。

是否了解为什么现在的测试表现不同?或者是否有办法配置它使它们脱离主线程?

将容器 属性 missingQueuesFatal 设置为 false

参见Spring for Apacher Kafka 2.2. What's new

A new container property (missingTopicsFatal) has been added. See Using KafkaMessageListenerContainer for more information.

Starting with version 2.2, a new container property called missingTopicsFatal has been added (default: true). This prevents the container from starting if any of the configured topics are not present on the broker. It does not apply if the container is configured to listen to a topic pattern (regex). Previously, the container threads looped within the consumer.poll() method waiting for the topic to appear while logging many messages. Aside from the logs, there was no indication that there was a problem. To restore the previous behavior, you can set the property to false.

将 属性 设置为 false 将禁用检查。

使用 spring 引导(例如 2.2.2.RELEASE),您可以使用:

spring.kafka.listener.missing-topics-fatal=false

这将使应用程序继续运行但不会收到有关缺少主题的消息。如果主题添加晚于必须重新启动应用程序才能让消费者接收消息。

8. Integration properties