MQTT 订阅队列未被清理

MQTT subscription queues not being cleaned up

我们将 ActiveMQ Artemis 2.8.1 与 MQTT 结合使用。

我们将 ActiveMQ Artemis 与 Wildfly 集成在一起。考虑我们的服务器在 50 个主题上使用客户端 ID SAM 进行连接。在使用 JConsole 检查 Artemis 时,我们可以看到每个客户端订阅都会生成一个名称遵循模式 <client-id>_<topic> 的队列。在我的例子中,将主题视为 com/api/output,这意味着订阅队列名称将是 SAM_com/api/output。同样,将有 50 个其他订阅队列使用相同的命名模式(即 SAM_<topic>)。

我的发现

根据我的研究,每个队列用于存储发送到每个客户端订阅的主题的消息。例如,当同一个主题(如1/2/3)被3个不同的客户端(如AB、&C)订阅时,就会有3个订阅队列(即 A_1/2/3B_1/2/3C_1/2/3)。因此,当消息发送到主题 1/2/3 时,Artemis 会将消息放入订阅队列 A_1/2/3B_1/2/3C_1/2/3.

实际问题

现在同一个客户现在想连接到具有不同客户 ID 的代理(例如 TOM)。我的客户端启动连接断开,Artemis 也识别连接断开,然后我的客户端连接到具有相同 50 个主题的新客户端 ID (TOM) 的代理。现在总共有 100 个订阅队列,每个主题有 2 个(即每个 clientid 一个 - SAM & TOM)。我找到了维护 SAM 队列的原因,因为在启动连接时我们使用 cleanSession 作为 false。所以所有这些订阅队列都将是持久的,因此即使客户端断开连接,队列也会得到维护。

当一条消息发送到主题时,它将被放入两个队列 (SAM & TOM)。我们的客户端连接到具有客户端 ID TOM 的代理,因此 TOM 队列有消费者,这导致客户端使用所有 TOM 队列消息。但是,SAM 队列会累积消息并耗尽所有 JVM 的堆空间,直到服务器死机。

持久队列的目的是即使在客户端断开连接时也能保持消息,但是如果客户端在特定时间段内未出现或当客户端断开连接时从客户端的订阅队列中清除消息?

我们的broker.xml

<configuration xmlns="urn:activemq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:xi="http://www.w3.org/2001/XInclude"
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

   <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="urn:activemq:core ">

      <name>0.0.0.0</name>
      <security-enabled>true</security-enabled>


      <persistence-enabled>false</persistence-enabled>

      <!-- this could be ASYNCIO, MAPPED, NIO
           ASYNCIO: Linux Libaio
           MAPPED: mmap files
           NIO: Plain Java Files
       -->
      <journal-type>ASYNCIO</journal-type>

      <paging-directory>data/paging</paging-directory>

      <bindings-directory>data/bindings</bindings-directory>

      <journal-directory>data/journal</journal-directory>

      <large-messages-directory>data/large-messages</large-messages-directory>

      <journal-datasync>true</journal-datasync>

      <journal-min-files>2</journal-min-files>

      <journal-pool-files>10</journal-pool-files>

      <journal-file-size>10M</journal-file-size>
      
      <journal-buffer-timeout>64000</journal-buffer-timeout>


      <!--
        When using ASYNCIO, this will determine the writing queue depth for libaio.
       -->
      <journal-max-io>4096</journal-max-io>
      <!--
        You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
         <network-check-NIC>theNicName</network-check-NIC>
        -->

      <!--
        Use this to use an HTTP server to validate the network
         <network-check-URL-list>http://www.apache.org</network-check-URL-list> -->

      <!-- <network-check-period>10000</network-check-period> -->
      <!-- <network-check-timeout>1000</network-check-timeout> -->

      <!-- this is a comma separated list, no spaces, just DNS or IPs
           it should accept IPV6

           Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
                    Using IPs that could eventually disappear or be partially visible may defeat the purpose.
                    You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
      <!-- <network-check-list>10.0.0.1</network-check-list> -->

      <!-- use this to customize the ping used for ipv4 addresses -->
      <!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->

      <!-- use this to customize the ping used for ipv6 addresses -->
      <!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->




      <!-- how often we are looking for how many bytes are being used on the disk in ms -->
      <disk-scan-period>5000</disk-scan-period>

      <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
           that won't support flow control. -->
      <max-disk-usage>99</max-disk-usage>

      <!-- should the broker detect dead locks and other issues -->
      <critical-analyzer>true</critical-analyzer>

      <critical-analyzer-timeout>120000</critical-analyzer-timeout>

      <critical-analyzer-check-period>60000</critical-analyzer-check-period>

      <critical-analyzer-policy>HALT</critical-analyzer-policy>
      

      <acceptors>

         <!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
         <!-- amqpCredits: The number of credits sent to AMQP producers -->
         <!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->

         <!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
                    "anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
                    See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->

         <!-- MQTT Acceptor -->
         <acceptor name="mqtt">tcp://0.0.0.0:${activeMQ-mqtt-port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,MQTT;useEpoll=true</acceptor>
         <acceptor name="netty-ssl-acceptor">tcp://0.0.0.0:${activeMQ-mqtts-port}?sslEnabled=true;keyStorePath=${activeMQ-keystore-path};keyStorePassword=${activeMQ-keyStore-password};protocols=CORE,MQTT</acceptor>

      </acceptors>


      <security-settings>
         <security-setting match="#">
            <permission type="createNonDurableQueue" roles="amq"/>
            <permission type="deleteNonDurableQueue" roles="amq"/>
            <permission type="createDurableQueue" roles="amq"/>
            <permission type="deleteDurableQueue" roles="amq"/>
            <permission type="createAddress" roles="amq"/>
            <permission type="deleteAddress" roles="amq"/>
            <permission type="consume" roles="amq"/>
            <permission type="browse" roles="amq"/>
            <permission type="send" roles="amq"/>
            <!-- we need this otherwise ./artemis data imp wouldn't work -->
            <permission type="manage" roles="amq"/>
         </security-setting>
      </security-settings>

      <address-settings>
         <!-- if you define auto-create on certain queues, management has to be auto-create -->
         <address-setting match="activemq.management#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
         </address-setting>
         <!--default for catch all-->
         <address-setting match="#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
         </address-setting>
      </address-settings>

      <addresses>
         <address name="DLQ">
            <anycast>
               <queue name="DLQ" />
            </anycast>
         </address>
         <address name="ExpiryQueue">
            <anycast>
               <queue name="ExpiryQueue" />
            </anycast>
         </address>

      </addresses>

      <broker-plugins>
         <broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
            <property key="LOG_ALL_EVENTS" value="true"/>
            <property key="LOG_CONNECTION_EVENTS" value="true"/>
            <property key="LOG_SESSION_EVENTS" value="true"/>
            <property key="LOG_CONSUMER_EVENTS" value="true"/>
            <property key="LOG_DELIVERING_EVENTS" value="true"/>
            <property key="LOG_SENDING_EVENTS" value="true"/>
            <property key="LOG_INTERNAL_EVENTS" value="true"/>
         </broker-plugin>
      </broker-plugins>
   </core>
</configuration>

订阅完成后是否删除订阅完全取决于订阅者。换句话说,MQTT 客户端在创建新订阅者之前应该取消订阅 现有订阅者。代理无法知道订阅者是否计划稍后重新连接以从其订阅中获取消息。

如果您真的希望经纪人删除订阅队列,您可以使用以下地址设置:

  • <auto-delete-queues>:这一定是true.
  • <auto-delete-queues-delay>:这是从最后一个消费者断开连接到代理删除订阅队列的延迟(以毫秒为单位)。
  • <auto-delete-queues-message-count>:这必须是 -1 才能忽略订阅队列中的任何消息。

明确地说,我建议反对此配置,因为代理可能会无意中删除合法消息。例如,如果订阅者错误地断开连接(例如由于网络故障、硬件故障、JVM 崩溃等)超过配置的 <auto-delete-queues-delay> 那么代理将删除其订阅队列并且所有订阅者的消息将有效迷路了。