停止 Spring JMS 侦听器,同时仍允许 JMS 消息发布

Stop Spring JMS listener while still allowing JMS message publishing

我正在使用 Spring JMS,我想关闭从队列中检索消息的功能,同时仍然允许从这些侦听器中的相同(或其他)队列上发布新消息。

我有一个 @JmsListener 方法可以将消息从队列中拉出并进行处理。这个 Listener,通过不同的阶段,将使用 JmsTemplate.convertAndSend()

发布新消息
@JmsListener(destination = "queue-name")
public void processJob(Object job) {
    jmsTemplate.convertAndSend("begin");
    // Do some stuff
    jmsTemplate.convertAndSend("almost-done");
    // More stuff!
    jmsTemplate.convertAndSend("done");
}

我也有一个非常简单的侦听器工厂:

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ActiveMQConnectionFactory myFactory, MessageConverter jacksonJmsMessageConverter) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(myFactory);
    factory.setMessageConverter(jacksonJmsMessageConverter);
    return factory;
}

现在,我想要的是阻止侦听器从队列中拉出消息,同时仍然允许方法内的 convertAndSend 调用成功继续。

我唯一看到的停止队列的方法是在 JmsListenerEndpointRegistry bean 上调用 jmsListenerEndpointRegistry.stop();。这将正确地停止队列,并且不会拉取任何消息。但是,似乎停止 JmsListenerEndpointRegistry 也会导致我尝试使用 JmsTemplate.convertAndSend 发布的消息无法发布。

当我在调用 jmsListenerEndpointRegistry.stop(); 之后尝试 convertAndSend 一条消息时,出现以下错误:

o.a.activemq.ActiveMQSessionExecutor     : Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()?

我想这个错误是有道理的。停止端点注册表不仅会关闭侦听器的功能,还会关闭发布消息的功能。

所以我的问题是:鉴于上述侦听器的情况,我如何关闭侦听器以便不处理新消息,同时仍然允许 JmsTemplate.convertAndSend 消息成功发布?

你的 application/environment 有点不寻常。这很好用...

@SpringBootApplication
public class So59863889Application {

    private static final Logger LOG = LoggerFactory.getLogger(So59863889Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So59863889Application.class, args);
    }

    @Autowired
    JmsTemplate template;

    @JmsListener(id = "foo", destination = "foo")
    public void listen1(String in) throws InterruptedException {
        LOG.info(in);
        Thread.sleep(5_000);
        this.template.convertAndSend("bar", in.toUpperCase());
    }

    @JmsListener(id = "bar", destination = "bar")
    public void listen2(String in) throws InterruptedException {
        LOG.info(in);
    }

    @Bean
    public ApplicationRunner runner(JmsTemplate template, JmsListenerEndpointRegistry registry) {
        return args -> {
            IntStream.range(0, 20).forEach(i -> template.convertAndSend("foo", "test" + i));
            System.in.read();
            LOG.info("Stopping foo");
            registry.getListenerContainer("foo").stop();
        };
    }

}

sleep() 之后发送(停止容器后工作正常)。

当我覆盖 Boot 的容器工厂时,我得到了相同的结果,但是显式引用 ActiveMQConnectionFactory 对我来说失败了(没有这样的 bean);我必须将其更改为 ConnectionFactory.

如果你能提供一个简单的应用程序(比如我的)来重现这个问题,我可以进一步查看。