删除 RabbitMQ 消费者并在浏览器的 RabbitMQ 控制台中查看它
Remove RabbitMQ consumers and see it in Browser's RabbitMQ Console
免责声明:RabbitMq 中的 Noobie and/or Spring 集成 and/or Spring Cloud Stream。
我有以下 class:
@Component
public class RabbitMQChannelBindingFactory {
...
private org.springframework.cloud.stream.binder.rabbitRabbitMessageChannelBinder binder;
private org.springframework.cloud.stream.config.BindingServiceProperties bindingServiceProperties;
private org.springframework.cloud.stream.binding.BindingService bindingService;
private org.springframework.beans.factory.config.ConfigurableListableBeanFactory beanFactory;
private org.springframework.cloud.stream.binding.SubscribableChannelBindingTargetFactory bindingTargetFactory;
private org.springframework.amqp.rabbit.connection.ConnectionFactory rabbitConnectionFactory;
...
}
需要什么?
我有一个创建 Exchange+Queue+Consumer 的机制,我有一个销毁它们的机制。
交换器和队列将自动删除设置为 true。
问题是什么?
破坏所有这 3 个元素的继承机制不起作用。
它只删除 Exchange。队列没有被删除,因为它仍然有一个消费者,我也可以在我的应用程序中看到它。
尝试了什么?
我尝试使用 JVisualVM 获取客户标签的 String 实例,然后我走上层次结构删除消费者。
我已经更改了我的应用程序中的 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer 以便它首先由 class 加载程序加载。
我在里面添加了这样的东西,以便跟踪在我的应用程序中创建的所有消费者:
public class BlockingQueueConsumer {
...
public static List<BlockingQueueConsumer> all = new ArrayList<>();
public BlockingQueueConsumer(...) {
...
all.add(this);
...
}
...
}
完成上一步后,我在 RabbitMQChannelBindingFactory 中添加了另一个方法
class 为所有消费者调用取消方法,如下所示:
class RabbitMQChannelBindingFactory {
public void disconnect(...) {
BlockingQueueConsumer lastBlockingQueueConsumer =
BlockingQueueConsumer.all.get(BlockingQueueConsumer.all.size() - 1);
lastBlockingQueueConsumer.getConsumerTags()
.forEach(consumerTag -> basicCancel(lastBlockingQueueConsumer, consumerTag));
}
}
此时在加载了 RabbitMQ 控制台的浏览器上,我们可以看到 Queue 被删除(除了 Exchange 和 Consumer)。
问题是什么?
我找不到将 BlockingQueueConsumer 连接到自动装配属性的方法。
比如我试过
public void deleteRabbitMQConsumer() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
rabbitTemplate.execute(channel -> {
if (channel instanceof ChannelN) {
ChannelN channelN = (ChannelN) channel;
return true;
}
return false;
});
}
但是ChannelN里面好像没有消费者。
你能给我一个方向吗?首先需要了解什么?
或者有哪些资源可以帮助我?
或者有人尝试过使用此自动装配属性取消消费者的操作吗?
或者我需要添加其他自动装配属性吗?
我已经尝试了 解决方案。
解决方法
@Component
public class RabbitMQChannelBindingFactory {
...
private org.springframework.cloud.stream.binder.rabbitRabbitMessageChannelBinder binder;
private void connectAndDisconnectConsumer(...) {
...
Binding<MessageChannel> messageChannelBinding =
binder.bindConsumer(exchangeName, "", channel, consumerProperties);
... // receive messages
messageChannelBinding.stop();
...
}
}
和堆栈跟踪:
messageChannelBinding.stop();
DefaultBinding#stop
AbstractEndpoint#stop()
AmqpInboundChannelAdapter#doStop
AbstractMessageListenerContainer#stop()
AbstractMessageListenerContainer#doStop
AbstractMessageListenerContainer#shutdown
SimpleMessageListenerContainer#doShutdown
BlockingQueueConsumer#basicCancel(boolean)
投票结束。你一定不能那样滥用Java类系统,而是更专注于学习你使用的库。可能有人已经询问过您正在寻找的解决方案。正如 Gary 所说:Spring Cloud Stream 绑定上只有 stop()
,它将停止 MessageListenerContainer
,这反过来又会取消它在队列中的所有消费者.并且您的自动删除队列将从 RabbitMQ 中删除。没有理由破坏交易所。尽管您可以通过 AmqpAdmin.deleteExchange()
.
免责声明:RabbitMq 中的 Noobie and/or Spring 集成 and/or Spring Cloud Stream。
我有以下 class:
@Component
public class RabbitMQChannelBindingFactory {
...
private org.springframework.cloud.stream.binder.rabbitRabbitMessageChannelBinder binder;
private org.springframework.cloud.stream.config.BindingServiceProperties bindingServiceProperties;
private org.springframework.cloud.stream.binding.BindingService bindingService;
private org.springframework.beans.factory.config.ConfigurableListableBeanFactory beanFactory;
private org.springframework.cloud.stream.binding.SubscribableChannelBindingTargetFactory bindingTargetFactory;
private org.springframework.amqp.rabbit.connection.ConnectionFactory rabbitConnectionFactory;
...
}
需要什么?
我有一个创建 Exchange+Queue+Consumer 的机制,我有一个销毁它们的机制。 交换器和队列将自动删除设置为 true。
问题是什么?
破坏所有这 3 个元素的继承机制不起作用。
它只删除 Exchange。队列没有被删除,因为它仍然有一个消费者,我也可以在我的应用程序中看到它。
尝试了什么?
我尝试使用 JVisualVM 获取客户标签的 String 实例,然后我走上层次结构删除消费者。
我已经更改了我的应用程序中的 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer 以便它首先由 class 加载程序加载。
我在里面添加了这样的东西,以便跟踪在我的应用程序中创建的所有消费者:
public class BlockingQueueConsumer {
...
public static List<BlockingQueueConsumer> all = new ArrayList<>();
public BlockingQueueConsumer(...) {
...
all.add(this);
...
}
...
}
完成上一步后,我在 RabbitMQChannelBindingFactory 中添加了另一个方法 class 为所有消费者调用取消方法,如下所示:
class RabbitMQChannelBindingFactory {
public void disconnect(...) {
BlockingQueueConsumer lastBlockingQueueConsumer =
BlockingQueueConsumer.all.get(BlockingQueueConsumer.all.size() - 1);
lastBlockingQueueConsumer.getConsumerTags()
.forEach(consumerTag -> basicCancel(lastBlockingQueueConsumer, consumerTag));
}
}
此时在加载了 RabbitMQ 控制台的浏览器上,我们可以看到 Queue 被删除(除了 Exchange 和 Consumer)。
问题是什么?
我找不到将 BlockingQueueConsumer 连接到自动装配属性的方法。
比如我试过
public void deleteRabbitMQConsumer() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
rabbitTemplate.execute(channel -> {
if (channel instanceof ChannelN) {
ChannelN channelN = (ChannelN) channel;
return true;
}
return false;
});
}
但是ChannelN里面好像没有消费者。
你能给我一个方向吗?首先需要了解什么?
或者有哪些资源可以帮助我?
或者有人尝试过使用此自动装配属性取消消费者的操作吗?
或者我需要添加其他自动装配属性吗?
我已经尝试了 解决方案。
解决方法
@Component
public class RabbitMQChannelBindingFactory {
...
private org.springframework.cloud.stream.binder.rabbitRabbitMessageChannelBinder binder;
private void connectAndDisconnectConsumer(...) {
...
Binding<MessageChannel> messageChannelBinding =
binder.bindConsumer(exchangeName, "", channel, consumerProperties);
... // receive messages
messageChannelBinding.stop();
...
}
}
和堆栈跟踪:
messageChannelBinding.stop();
DefaultBinding#stop
AbstractEndpoint#stop()
AmqpInboundChannelAdapter#doStop
AbstractMessageListenerContainer#stop()
AbstractMessageListenerContainer#doStop
AbstractMessageListenerContainer#shutdown
SimpleMessageListenerContainer#doShutdown
BlockingQueueConsumer#basicCancel(boolean)
投票结束。你一定不能那样滥用Java类系统,而是更专注于学习你使用的库。可能有人已经询问过您正在寻找的解决方案。正如 Gary 所说:Spring Cloud Stream 绑定上只有 stop()
,它将停止 MessageListenerContainer
,这反过来又会取消它在队列中的所有消费者.并且您的自动删除队列将从 RabbitMQ 中删除。没有理由破坏交易所。尽管您可以通过 AmqpAdmin.deleteExchange()
.