为什么使用 containerGroup 会阻止我的其他侦听器工作?
Why using containerGroup is preventing my other listeners to work?
我的应用程序正在收听多个主题。
其中一些是压缩主题,用于在内存中加载一些数据。
我想先加载这些数据,所以我使用 SmartLifecycle 在其他容器之前手动启动这些容器。
效果很好,但为简单起见,我尝试使用容器组
@KafkaListener(id = "myId", containerGroup = "compacted", ...)
然后在我使用的 SmartLifecycle bean 中:
Collection<MessageListenerContainer> compactedListenerContainers = applicationContext.getBean("compacted", Collection.class);
但是一旦我这样做了,在 "start" 方法完成后,其他容器将永远不会启动。
如果我将此行替换为:
Collection<MessageListenerContainer> compactedListenerContainers = Arrays.asList(registry.getListenerContainer("myId"));
正在运行。
知道为什么获取 containerGroup 的 bean 会阻止所有其他侦听器工作吗?知道所有其他 @KafkaListeners 只是由 :
定义
@KafkaListener(topics = "myTopic")
编辑
经进一步排查,问题与KafkaListenerEndpointRegistry有关。
如果 SmartLifeCycle bean 是使用 "KafkaListenerEndpointRegistry" 作为依赖项创建的,则应用程序正在运行。即使我根本不使用注册表。
但如果在没有此注册表的情况下创建 SmartLifeCycle bean,应用程序将失败。
你需要展示你的容器工厂。
我假设您已将 autoStartup
设置为 false,因为您是手动启动它们。
所以其他人也不会开始;因为您想在加载压缩主题后启动它们,只需在端点注册表上调用 start()
,它将启动其他主题。
或者你可以把其他的放在另一个containerGroup
。
我的应用程序正在收听多个主题。 其中一些是压缩主题,用于在内存中加载一些数据。
我想先加载这些数据,所以我使用 SmartLifecycle 在其他容器之前手动启动这些容器。
效果很好,但为简单起见,我尝试使用容器组
@KafkaListener(id = "myId", containerGroup = "compacted", ...)
然后在我使用的 SmartLifecycle bean 中:
Collection<MessageListenerContainer> compactedListenerContainers = applicationContext.getBean("compacted", Collection.class);
但是一旦我这样做了,在 "start" 方法完成后,其他容器将永远不会启动。
如果我将此行替换为:
Collection<MessageListenerContainer> compactedListenerContainers = Arrays.asList(registry.getListenerContainer("myId"));
正在运行。
知道为什么获取 containerGroup 的 bean 会阻止所有其他侦听器工作吗?知道所有其他 @KafkaListeners 只是由 :
定义@KafkaListener(topics = "myTopic")
编辑
经进一步排查,问题与KafkaListenerEndpointRegistry有关。
如果 SmartLifeCycle bean 是使用 "KafkaListenerEndpointRegistry" 作为依赖项创建的,则应用程序正在运行。即使我根本不使用注册表。
但如果在没有此注册表的情况下创建 SmartLifeCycle bean,应用程序将失败。
你需要展示你的容器工厂。
我假设您已将 autoStartup
设置为 false,因为您是手动启动它们。
所以其他人也不会开始;因为您想在加载压缩主题后启动它们,只需在端点注册表上调用 start()
,它将启动其他主题。
或者你可以把其他的放在另一个containerGroup
。