如何使用@RabbitListener 停止和重新开始使用来自 RabbitMQ 的消息
How to stop and restart consuming message from the RabbitMQ with @RabbitListener
我可以停止消费并重新开始消费,但问题是当我重新开始消费时,我能够处理已经发布的消息,但是当我发布新消息时,它们无法处理.
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
@Component
public class RabbitMqueue implements Consumer {
int count = 0;
@RabbitListener(queues="dataQueue")
public void receivedData(@Payload Event msg, Channel channel,
@Header(AmqpHeaders.CONSUMER_TAG) String tag) throws IOException,
InterruptedException {
count++;
System.out.println("\n Message recieved from the Dataqueue is " + msg);
//Canceling consuming working fine.
if(count == 1) {
channel.basicCancel(tag);
System.out.println("Consumer is cancle");
}
count++;
System.out.println("\n count is " + count + "\n");
Thread.sleep(5000);
//restarting consumer. able to process already consumed messages
//but not able to see the newly published messages to the queue I mean
//newly published message is moving from ready to unack state but nothing
//happening on the consumer side.
if(count == 2) {
channel.basicConsume("dataQueue", this);
System.out.println("Consumer is started ");
}
}
}
你不能这样做channel.basicCancel(tag)
。
channel/consumer 由 Spring 管理;您唯一应该对消费者参数做的是 ack 或 nack 消息(即使很少需要 - 最好让容器执行 acks)。
对于 stop/start 消费者,使用端点注册表 as described in the documentation。
Containers created for annotations are not registered with the application context. You can obtain a collection of all containers by invoking getListenerContainers()
on the RabbitListenerEndpointRegistry
bean. You can then iterate over this collection, for example, to stop/start all containers or invoke the Lifecycle
methods on the registry itself which will invoke the operations on each container.
例如registry.stop()
将停止所有侦听器。
You can also get a reference to an individual container using its id, using getListenerContainer(String id)
; for example registry.getListenerContainer("multi")
for the container created by the snippet above.
如果您使用的是 AMQP/Rabbit,您可以尝试以下方法之一:
1) 在代码中防止在启动时启动:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//
//autoStartup = false, prevents handling messages immedeatly. You need to start each listener itselve.
//
factory.setAutoStartup(false);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
2) 防止在应用程序启动时启动。yml/props:
rabbitmq.listener.auto-startup: false
rabbitmq.listener.simple.auto-startup: false
3) Start/stop 个人听众
给你的@RabbitListener 一个ID:
@RabbitListener(queues = "myQ", id = "myQ")
...
和:
@Autowired
private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
MessageListenerContainer listener =
rabbitListenerEndpointRegistry.getListenerContainer("myQ");
...
listener.start();
...
listener.stop();
我可以停止消费并重新开始消费,但问题是当我重新开始消费时,我能够处理已经发布的消息,但是当我发布新消息时,它们无法处理.
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
@Component
public class RabbitMqueue implements Consumer {
int count = 0;
@RabbitListener(queues="dataQueue")
public void receivedData(@Payload Event msg, Channel channel,
@Header(AmqpHeaders.CONSUMER_TAG) String tag) throws IOException,
InterruptedException {
count++;
System.out.println("\n Message recieved from the Dataqueue is " + msg);
//Canceling consuming working fine.
if(count == 1) {
channel.basicCancel(tag);
System.out.println("Consumer is cancle");
}
count++;
System.out.println("\n count is " + count + "\n");
Thread.sleep(5000);
//restarting consumer. able to process already consumed messages
//but not able to see the newly published messages to the queue I mean
//newly published message is moving from ready to unack state but nothing
//happening on the consumer side.
if(count == 2) {
channel.basicConsume("dataQueue", this);
System.out.println("Consumer is started ");
}
}
}
你不能这样做channel.basicCancel(tag)
。
channel/consumer 由 Spring 管理;您唯一应该对消费者参数做的是 ack 或 nack 消息(即使很少需要 - 最好让容器执行 acks)。
对于 stop/start 消费者,使用端点注册表 as described in the documentation。
Containers created for annotations are not registered with the application context. You can obtain a collection of all containers by invoking
getListenerContainers()
on theRabbitListenerEndpointRegistry
bean. You can then iterate over this collection, for example, to stop/start all containers or invoke theLifecycle
methods on the registry itself which will invoke the operations on each container.
例如registry.stop()
将停止所有侦听器。
You can also get a reference to an individual container using its id, using
getListenerContainer(String id)
; for exampleregistry.getListenerContainer("multi")
for the container created by the snippet above.
如果您使用的是 AMQP/Rabbit,您可以尝试以下方法之一:
1) 在代码中防止在启动时启动:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//
//autoStartup = false, prevents handling messages immedeatly. You need to start each listener itselve.
//
factory.setAutoStartup(false);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
2) 防止在应用程序启动时启动。yml/props:
rabbitmq.listener.auto-startup: false
rabbitmq.listener.simple.auto-startup: false
3) Start/stop 个人听众
给你的@RabbitListener 一个ID:
@RabbitListener(queues = "myQ", id = "myQ")
...
和:
@Autowired
private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
MessageListenerContainer listener =
rabbitListenerEndpointRegistry.getListenerContainer("myQ");
...
listener.start();
...
listener.stop();