停止 Spring JMS 侦听器,同时仍允许 JMS 消息发布
Stop Spring JMS listener while still allowing JMS message publishing
我正在使用 Spring JMS,我想关闭从队列中检索消息的功能,同时仍然允许从这些侦听器中的相同(或其他)队列上发布新消息。
我有一个 @JmsListener
方法可以将消息从队列中拉出并进行处理。这个 Listener,通过不同的阶段,将使用 JmsTemplate.convertAndSend()
发布新消息
@JmsListener(destination = "queue-name")
public void processJob(Object job) {
jmsTemplate.convertAndSend("begin");
// Do some stuff
jmsTemplate.convertAndSend("almost-done");
// More stuff!
jmsTemplate.convertAndSend("done");
}
我也有一个非常简单的侦听器工厂:
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ActiveMQConnectionFactory myFactory, MessageConverter jacksonJmsMessageConverter) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(myFactory);
factory.setMessageConverter(jacksonJmsMessageConverter);
return factory;
}
现在,我想要的是阻止侦听器从队列中拉出消息,同时仍然允许方法内的 convertAndSend
调用成功继续。
我唯一看到的停止队列的方法是在 JmsListenerEndpointRegistry
bean 上调用 jmsListenerEndpointRegistry.stop();
。这将正确地停止队列,并且不会拉取任何消息。但是,似乎停止 JmsListenerEndpointRegistry
也会导致我尝试使用 JmsTemplate.convertAndSend
发布的消息无法发布。
当我在调用 jmsListenerEndpointRegistry.stop();
之后尝试 convertAndSend
一条消息时,出现以下错误:
o.a.activemq.ActiveMQSessionExecutor : Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()?
我想这个错误是有道理的。停止端点注册表不仅会关闭侦听器的功能,还会关闭发布消息的功能。
所以我的问题是:鉴于上述侦听器的情况,我如何关闭侦听器以便不处理新消息,同时仍然允许 JmsTemplate.convertAndSend
消息成功发布?
你的 application/environment 有点不寻常。这很好用...
@SpringBootApplication
public class So59863889Application {
private static final Logger LOG = LoggerFactory.getLogger(So59863889Application.class);
public static void main(String[] args) {
SpringApplication.run(So59863889Application.class, args);
}
@Autowired
JmsTemplate template;
@JmsListener(id = "foo", destination = "foo")
public void listen1(String in) throws InterruptedException {
LOG.info(in);
Thread.sleep(5_000);
this.template.convertAndSend("bar", in.toUpperCase());
}
@JmsListener(id = "bar", destination = "bar")
public void listen2(String in) throws InterruptedException {
LOG.info(in);
}
@Bean
public ApplicationRunner runner(JmsTemplate template, JmsListenerEndpointRegistry registry) {
return args -> {
IntStream.range(0, 20).forEach(i -> template.convertAndSend("foo", "test" + i));
System.in.read();
LOG.info("Stopping foo");
registry.getListenerContainer("foo").stop();
};
}
}
sleep()
之后发送(停止容器后工作正常)。
当我覆盖 Boot 的容器工厂时,我得到了相同的结果,但是显式引用 ActiveMQConnectionFactory
对我来说失败了(没有这样的 bean);我必须将其更改为 ConnectionFactory
.
如果你能提供一个简单的应用程序(比如我的)来重现这个问题,我可以进一步查看。
我正在使用 Spring JMS,我想关闭从队列中检索消息的功能,同时仍然允许从这些侦听器中的相同(或其他)队列上发布新消息。
我有一个 @JmsListener
方法可以将消息从队列中拉出并进行处理。这个 Listener,通过不同的阶段,将使用 JmsTemplate.convertAndSend()
@JmsListener(destination = "queue-name")
public void processJob(Object job) {
jmsTemplate.convertAndSend("begin");
// Do some stuff
jmsTemplate.convertAndSend("almost-done");
// More stuff!
jmsTemplate.convertAndSend("done");
}
我也有一个非常简单的侦听器工厂:
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ActiveMQConnectionFactory myFactory, MessageConverter jacksonJmsMessageConverter) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(myFactory);
factory.setMessageConverter(jacksonJmsMessageConverter);
return factory;
}
现在,我想要的是阻止侦听器从队列中拉出消息,同时仍然允许方法内的 convertAndSend
调用成功继续。
我唯一看到的停止队列的方法是在 JmsListenerEndpointRegistry
bean 上调用 jmsListenerEndpointRegistry.stop();
。这将正确地停止队列,并且不会拉取任何消息。但是,似乎停止 JmsListenerEndpointRegistry
也会导致我尝试使用 JmsTemplate.convertAndSend
发布的消息无法发布。
当我在调用 jmsListenerEndpointRegistry.stop();
之后尝试 convertAndSend
一条消息时,出现以下错误:
o.a.activemq.ActiveMQSessionExecutor : Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()?
我想这个错误是有道理的。停止端点注册表不仅会关闭侦听器的功能,还会关闭发布消息的功能。
所以我的问题是:鉴于上述侦听器的情况,我如何关闭侦听器以便不处理新消息,同时仍然允许 JmsTemplate.convertAndSend
消息成功发布?
你的 application/environment 有点不寻常。这很好用...
@SpringBootApplication
public class So59863889Application {
private static final Logger LOG = LoggerFactory.getLogger(So59863889Application.class);
public static void main(String[] args) {
SpringApplication.run(So59863889Application.class, args);
}
@Autowired
JmsTemplate template;
@JmsListener(id = "foo", destination = "foo")
public void listen1(String in) throws InterruptedException {
LOG.info(in);
Thread.sleep(5_000);
this.template.convertAndSend("bar", in.toUpperCase());
}
@JmsListener(id = "bar", destination = "bar")
public void listen2(String in) throws InterruptedException {
LOG.info(in);
}
@Bean
public ApplicationRunner runner(JmsTemplate template, JmsListenerEndpointRegistry registry) {
return args -> {
IntStream.range(0, 20).forEach(i -> template.convertAndSend("foo", "test" + i));
System.in.read();
LOG.info("Stopping foo");
registry.getListenerContainer("foo").stop();
};
}
}
sleep()
之后发送(停止容器后工作正常)。
当我覆盖 Boot 的容器工厂时,我得到了相同的结果,但是显式引用 ActiveMQConnectionFactory
对我来说失败了(没有这样的 bean);我必须将其更改为 ConnectionFactory
.
如果你能提供一个简单的应用程序(比如我的)来重现这个问题,我可以进一步查看。