删除 RabbitMQ 消费者并在浏览器的 RabbitMQ 控制台中查看它

Remove RabbitMQ consumers and see it in Browser's RabbitMQ Console

免责声明:RabbitMq 中的 Noobie and/or Spring 集成 and/or Spring Cloud Stream。

我有以下 class:

@Component
public class RabbitMQChannelBindingFactory {

...

  private org.springframework.cloud.stream.binder.rabbitRabbitMessageChannelBinder binder;
  private org.springframework.cloud.stream.config.BindingServiceProperties bindingServiceProperties;
  private org.springframework.cloud.stream.binding.BindingService bindingService;
  private org.springframework.beans.factory.config.ConfigurableListableBeanFactory beanFactory;
  private org.springframework.cloud.stream.binding.SubscribableChannelBindingTargetFactory bindingTargetFactory;
  private org.springframework.amqp.rabbit.connection.ConnectionFactory rabbitConnectionFactory;
...
}

需要什么?

我有一个创建 Exchange+Queue+Consumer 的机制,我有一个销毁它们的机制。 交换器和队列将自动删除设置为 true。

问题是什么?

破坏所有这 3 个元素的继承机制不起作用。

它只删除 Exchange。队列没有被删除,因为它仍然有一个消费者,我也可以在我的应用程序中看到它。

尝试了什么?

我尝试使用 JVisualVM 获取客户标签的 String 实例,然后我走上层次结构删除消费者。

我已经更改了我的应用程序中的 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer 以便它首先由 class 加载程序加载。

我在里面添加了这样的东西,以便跟踪在我的应用程序中创建的所有消费者:

public class BlockingQueueConsumer {
  ...
  public static List<BlockingQueueConsumer> all = new ArrayList<>();
  public BlockingQueueConsumer(...) {
     ...
     all.add(this);
     ...
  }
  ...
}

完成上一步后,我在 RabbitMQChannelBindingFactory 中添加了另一个方法 class 为所有消费者调用取消方法,如下所示:

class RabbitMQChannelBindingFactory {

   public void disconnect(...) {
      BlockingQueueConsumer lastBlockingQueueConsumer = 
          BlockingQueueConsumer.all.get(BlockingQueueConsumer.all.size() - 1);
      lastBlockingQueueConsumer.getConsumerTags()
          .forEach(consumerTag -> basicCancel(lastBlockingQueueConsumer, consumerTag));
   }
}

此时在加载了 RabbitMQ 控制台的浏览器上,我们可以看到 Queue 被删除(除了 Exchange 和 Consumer)。

问题是什么?

我找不到将 BlockingQueueConsumer 连接到自动装配属性的方法。

比如我试过

  public void deleteRabbitMQConsumer() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
    rabbitTemplate.execute(channel -> {
      if (channel instanceof ChannelN) {
        ChannelN channelN = (ChannelN) channel;
        return true;
      }
      return false;
    });
  }

但是ChannelN里面好像没有消费者。

你能给我一个方向吗?首先需要了解什么?

或者有哪些资源可以帮助我?

或者有人尝试过使用此自动装配属性取消消费者的操作吗?

或者我需要添加其他自动装配属性吗?

我已经尝试了 解决方案。

解决方法

@Component
public class RabbitMQChannelBindingFactory {

...

  private org.springframework.cloud.stream.binder.rabbitRabbitMessageChannelBinder binder;

  private void connectAndDisconnectConsumer(...) {
      ...
      Binding<MessageChannel> messageChannelBinding = 
         binder.bindConsumer(exchangeName, "", channel, consumerProperties);
      ... // receive messages
      messageChannelBinding.stop(); 
     ...
   }  

}

和堆栈跟踪:

messageChannelBinding.stop();
 DefaultBinding#stop
  AbstractEndpoint#stop()
   AmqpInboundChannelAdapter#doStop
    AbstractMessageListenerContainer#stop()
     AbstractMessageListenerContainer#doStop
      AbstractMessageListenerContainer#shutdown
      SimpleMessageListenerContainer#doShutdown
       BlockingQueueConsumer#basicCancel(boolean)

投票结束。你一定不能那样滥用Java类系统,而是更专注于学习你使用的库。可能有人已经询问过您正在寻找的解决方案。正如 Gary 所说:Spring Cloud Stream 绑定上只有 stop(),它将停止 MessageListenerContainer,这反过来又会取消它在队列中的所有消费者.并且您的自动删除队列将从 RabbitMQ 中删除。没有理由破坏交易所。尽管您可以通过 AmqpAdmin.deleteExchange().