消息在执行完成后写入队列,生产者已停止

Messages are written to the queue after complete execution and the producer has stopped

我遇到了经纪人(ActiveMQ-Artemis 版本 2.17.0)对我而言异常的行为。

消息量大,厂商快速发送时,部分消息执行完毕到达队列,厂商停止。当硬盘正常,而不是SSD时,这一点尤其明显。

作为示例,我使用以下 Apache Camel 2.25.3 路由发送消息

        <route autoStartup="false" factor:name="TCP SJMS2"
            factor:trace="false" id="route-e098a2c8-efd4-41dd-9c1d-57937663cfbe">
            <from id="endpoint-cef2b9db-e359-4fb0-aa4d-4afda4f79c10" uri="timer://init?delay=-1&amp;repeatCount=200000">
            </from>
            <setBody factor:component="SetBodyEndpoint"
                factor:custom-name="Установить тело сообщения"
                factor:guid="endpoint-361ea09a-9e8a-4f44-a428-05e27dbdf3b5" id="endpoint-361ea09a-9e8a-4f44-a428-05e27dbdf3b5">
                <simple>&lt;?xml version="1.0" encoding="utf-8"?&gt;
&lt;env:Envelope xmlns:env="http://www.w3.org/2003/05/soap-envelope"&gt;
  2 kB message body
&lt;/env:Envelope&gt;</simple>
            </setBody>
            <to id="endpoint-546af4a0-ebe5-4479-91f0-f6b6609264cc" uri="local2amq://TCP.IN?connectionFactory=%23tcpArtemisCF">
            </to>
        </route>

    <bean class="org.apache.camel.component.sjms2.Sjms2Component" id="local2amq">
        <property name="connectionFactory" ref="artemisConnectionFactory"/>
        <property name="connectionCount" value="5"/>
    </bean>

    <bean
     class="org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory"
        factor:bean-type="ARTEMIS_CONNECTION_FACTORY" id="tcpArtemisCF" name="tcpArtemisCF">
        <property name="brokerURL" value="(tcp://localhost:61717)?blockOnDurableSend=false"/>
    </bean>

这条路由很快,发送了 200,000 条消息,速度在 6,000 左右 s/s。

但是如果走完这条路,再去broker的队列,队列里只有80000条左右的消息,剩下的会以200-2000条的速度慢慢添加s/s

我在常规ActiveMQ中没有看到这样的行为,路由完成后,所有消息都在队列中。

主要问题。

  1. 这种行为是否常见且符合预期?它受什么参数调节?

  2. 如何查看已发送但尚未在队列中的消息数?

  3. 如何实现该行为以便在路由终止时将所有消息写入队列?

经纪人配置

<?xml version='1.0'?>
<configuration xmlns="urn:activemq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:xi="http://www.w3.org/2001/XInclude"
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

   <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="urn:activemq:core ">

      <name>0.0.0.0</name>


      <persistence-enabled>true</persistence-enabled>

      <journal-type>NIO</journal-type>

      <paging-directory>data/paging</paging-directory>

      <bindings-directory>data/bindings</bindings-directory>

      <journal-directory>data/journal</journal-directory>

      <large-messages-directory>data/large-messages</large-messages-directory>

      <journal-datasync>true</journal-datasync>

      <journal-min-files>2</journal-min-files>

      <journal-pool-files>10</journal-pool-files>

      <journal-device-block-size>4096</journal-device-block-size>

      <journal-file-size>10M</journal-file-size>
      
      <journal-buffer-timeout>836000</journal-buffer-timeout>

      <journal-max-io>1</journal-max-io>

      <disk-scan-period>5000</disk-scan-period>

      <max-disk-usage>100</max-disk-usage>

      <critical-analyzer>true</critical-analyzer>

      <critical-analyzer-timeout>120000</critical-analyzer-timeout>

      <critical-analyzer-check-period>60000</critical-analyzer-check-period>

      <critical-analyzer-policy>HALT</critical-analyzer-policy>

      
      <page-sync-timeout>836000</page-sync-timeout>


      <acceptors>
         <acceptor name="artemis">tcp://0.0.0.0:61717</acceptor>
      </acceptors>


      <security-settings>
         <security-setting match="#">
            <permission type="createNonDurableQueue" roles="amq"/>
            <permission type="deleteNonDurableQueue" roles="amq"/>
            <permission type="createDurableQueue" roles="amq"/>
            <permission type="deleteDurableQueue" roles="amq"/>
            <permission type="createAddress" roles="amq"/>
            <permission type="deleteAddress" roles="amq"/>
            <permission type="consume" roles="amq"/>
            <permission type="browse" roles="amq"/>
            <permission type="send" roles="amq"/>
            <!-- we need this otherwise ./artemis data imp wouldn't work -->
            <permission type="manage" roles="amq"/>
         </security-setting>
      </security-settings>

      <address-settings>
         <!-- if you define auto-create on certain queues, management has to be auto-create -->
         <address-setting match="activemq.management#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
         </address-setting>
         <!--default for catch all-->
         <address-setting match="#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
         </address-setting>
      </address-settings>

      <addresses>
         <address name="DLQ">
            <anycast>
               <queue name="DLQ" />
            </anycast>
         </address>
         <address name="ExpiryQueue">
            <anycast>
               <queue name="ExpiryQueue" />
            </anycast>
         </address>

      </addresses>
   </core>
</configuration>

经纪人日志

17:58:19,887 INFO  [org.apache.activemq.artemis.integration.bootstrap] AMQ101000: Starting ActiveMQ Artemis Server
2021-04-05 17:58:19,926 INFO  [org.apache.activemq.artemis.core.server] AMQ221000: live Message Broker is starting with configuration Broker Configuration (clustered=false,journalDirectory=data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/large-messages,pagingDirectory=data/paging)
2021-04-05 17:58:19,958 INFO  [org.apache.activemq.artemis.core.server] AMQ221013: Using NIO Journal
2021-04-05 17:58:20,038 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-server]. Adding protocol support for: CORE
2021-04-05 17:58:20,039 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-amqp-protocol]. Adding protocol support for: AMQP
2021-04-05 17:58:20,041 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-hornetq-protocol]. Adding protocol support for: HORNETQ
2021-04-05 17:58:20,047 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-mqtt-protocol]. Adding protocol support for: MQTT
2021-04-05 17:58:20,047 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-openwire-protocol]. Adding protocol support for: OPENWIRE
2021-04-05 17:58:20,048 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-stomp-protocol]. Adding protocol support for: STOMP
2021-04-05 17:58:20,163 INFO  [org.apache.activemq.artemis.core.server] AMQ221034: Waiting indefinitely to obtain live lock
2021-04-05 17:58:20,163 INFO  [org.apache.activemq.artemis.core.server] AMQ221035: Live Server Obtained live lock
2021-04-05 17:58:21,867 INFO  [org.apache.activemq.artemis.core.server] AMQ221080: Deploying address DLQ supporting [ANYCAST]
2021-04-05 17:58:21,869 INFO  [org.apache.activemq.artemis.core.server] AMQ221003: Deploying ANYCAST queue DLQ on address DLQ
2021-04-05 17:58:21,876 INFO  [org.apache.activemq.artemis.core.server] AMQ221080: Deploying address ExpiryQueue supporting [ANYCAST]
2021-04-05 17:58:21,877 INFO  [org.apache.activemq.artemis.core.server] AMQ221003: Deploying ANYCAST queue ExpiryQueue on address ExpiryQueue
2021-04-05 17:58:22,686 INFO  [org.apache.activemq.artemis.core.server] AMQ221020: Started NIO Acceptor at 0.0.0.0:61717 for protocols [CORE,MQTT,AMQP,HORNETQ,STOMP,OPENWIRE]
2021-04-05 17:58:22,797 INFO  [org.apache.activemq.artemis.core.server] AMQ221007: Server is now live
2021-04-05 17:58:22,798 INFO  [org.apache.activemq.artemis.core.server] AMQ221001: Apache ActiveMQ Artemis Message Broker version 2.17.0 [0.0.0.0, nodeID=024cff0e-8ff2-11eb-8968-c0b6f9f8ba29]
2021-04-05 17:58:23,113 INFO  [org.apache.activemq.hawtio.branding.PluginContextListener] Initialized activemq-branding plugin
2021-04-05 17:58:23,250 INFO  [org.apache.activemq.hawtio.plugin.PluginContextListener] Initialized artemis-plugin plugin
2021-04-05 17:58:24,336 INFO  [io.hawt.HawtioContextListener] Initialising hawtio services
2021-04-05 17:58:24,349 INFO  [io.hawt.system.ConfigManager] Configuration will be discovered via system properties
2021-04-05 17:58:24,352 INFO  [io.hawt.jmx.JmxTreeWatcher] Welcome to Hawtio 2.11.0
2021-04-05 17:58:24,359 INFO  [io.hawt.web.auth.AuthenticationConfiguration] Starting hawtio authentication filter, JAAS realm: "activemq" authorized role(s): "amq" role principal classes: "org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal"
2021-04-05 17:58:24,378 INFO  [io.hawt.web.proxy.ProxyServlet] Proxy servlet is disabled
2021-04-05 17:58:24,385 INFO  [io.hawt.web.servlets.JolokiaConfiguredAgentServlet] Jolokia overridden property: [key=policyLocation, value=file:/D:/Documents/apache-artemis-2.17.0/bin/emptyNew/etc/\jolokia-access.xml]
2021-04-05 17:58:24,712 INFO  [org.apache.activemq.artemis] AMQ241001: HTTP Server started at http://localhost:8161
2021-04-05 17:58:24,713 INFO  [org.apache.activemq.artemis] AMQ241002: Artemis Jolokia REST API available at http://localhost:8161/console/jolokia
2021-04-05 17:58:24,714 INFO  [org.apache.activemq.artemis] AMQ241004: Artemis Console available at http://localhost:8161/console
2021-04-05 17:59:08,763 INFO  [io.hawt.web.auth.LoginServlet] Hawtio login is using 1800 sec. HttpSession timeout
2021-04-05 17:59:10,512 INFO  [io.hawt.web.auth.LoginServlet] Logging in user: root
2021-04-05 17:59:11,206 INFO  [io.hawt.web.auth.keycloak.KeycloakServlet] Keycloak integration is disabled

更新

常规 ActiveMQ 版本 5.15.11 的数据

骆驼路线

        <route autoStartup="false" factor:name="TCP SJMS2"
            factor:trace="false" id="route-e098a2c8-efd4-41dd-9c1d-57937663cfbe">
            <from id="endpoint-cef2b9db-e359-4fb0-aa4d-4afda4f79c10" uri="timer://init?delay=-1&amp;repeatCount=200000">
            </from>
            <setBody factor:component="SetBodyEndpoint"
                factor:custom-name="Установить тело сообщения"
                factor:guid="endpoint-361ea09a-9e8a-4f44-a428-05e27dbdf3b5" id="endpoint-361ea09a-9e8a-4f44-a428-05e27dbdf3b5">
                <simple>&lt;?xml version="1.0" encoding="utf-8"?&gt;
&lt;env:Envelope xmlns:env="http://www.w3.org/2003/05/soap-envelope"&gt;
  2 kB message body
&lt;/env:Envelope&gt;</simple>
            </setBody>
            <to id="endpoint-f697cad9-90db-47b9-877f-4189febdd010" uri="localmq://PERF.IN?connectionFactory=%23tcpActiveMQCF">
            </to>
        </route>

    <bean class="org.apache.activemq.camel.component.ActiveMQComponent" id="localmq">
        <property name="configuration" ref="jmsConfig"/>
    </bean>

     <bean class="org.apache.camel.component.jms.JmsConfiguration" id="jmsConfig">
        <property name="asyncStartListener" value="true"/>
        <property name="cacheLevelName" value="CACHE_CONSUMER"/> 
        <property name="preserveMessageQos" value="true"/>  
    </bean> 
    <bean class="org.apache.activemq.pool.PooledConnectionFactory"
        destroy-method="stop" factor:bean-type="AMQ_CONNECTION_FACTORY"
        id="tcpActiveMQCF" init-method="start" name="tcpActiveMQCF">
        <property name="maxConnections" value="1"/>
        <property name="maximumActiveSessionPerConnection" value="15"/>
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory"
                id="tcpActiveMQCF_connection" name="tcpActiveMQCF_connection">
                <property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=false"/>
            </bean>
        </property>
    </bean>

对于这条路线,我的速度约为 2500 s/s,并且消息会立即写入队列,而更改参数 jms.useAsyncSend 实际上不会影响我的性能。

这种行为在发送非持久消息时是预期的,因为非持久消息是以非阻塞方式发送的。目前尚不清楚您是否正在发送非持久消息,但您还在客户端的 URL 上设置了 blockOnDurableSend=false,因此即使是持久消息也将以非阻塞方式发送。

从代理的角度来看,消息实际上并未到达,因此无法查看已发送但尚未进入队列的消息数量。

如果您想确保当 Camel 路由终止时所有消息都写入队列,那么您应该发送持久消息并设置 blockOnDurableSend=true(这是默认值)。

请记住,阻塞 会根据您的硬盘速度降低性能(可能会大幅降低)。这是因为客户端必须等待代理发送的每条消息的响应,并且对于代理收到的每条消息,它必须将该消息保存到磁盘并等待硬盘同步,然后再发送响应返回给客户端。所以,如果你的硬盘不能快速同步,客户端相对来说要等很久。

影响此行为的配置参数之一是 journal-buffer-timeout。该值是在首次创建代理实例时自动计算和设置的。您将看到此记录的证据,例如:

Auto tuning journal ...
done! Your system can make 250 writes per millisecond, your journal-buffer-timeout will be 4000

在您的情况下,journal-buffer-timeout 已设置为 836000,这非常慢(超时时间越长,磁盘越慢)。这意味着您的磁盘每毫秒只能进行 1.2 次写入。如果您认为此值有误,您可以 运行 artemis perf-journal 命令重新计算并相应地更新配置。

给你一个比较,我的 journal-buffer-timeout4000,我可以 运行 使用 ActiveMQ Artemis 的 artemis producer --protocol amqp 命令,它将在更少的时间内发送 1,000 条持久消息几 运行 秒后超过 700 毫秒。如果我使用 --non-persistent 标志,持续时间会下降到大约 200 毫秒。

如果我对 ActiveMQ 5.16.0 的默认安装执行相同的测试,则分别需要大约 900 和 200 毫秒,考虑到测试的性质,这并不奇怪。

值得注意的是,ActiveMQ 5.x 具有 jms.useAsyncSend 参数,其功能等同于 Artemis 中的 blockOnDurableSendblockOnNonDurableSend。但是,如果您使用它,您不太可能看到如此大的差异,因为 5.x 代理有很多固有的内部阻塞,而 Artemis 是从头开始编写的,完全是非阻塞的。因此,Artemis 的潜在性能上限远高于 5.x,这也是 Artemis 存在的主要原因之一。

请记住,无论是否阻止,您实际上只是分别以可靠性换取速度。通过不阻塞,您是在告诉客户“解雇后忘记”。根据定义,客户端不知道消息是否真的被代理成功接收。换句话说,从客户端的角度来看,以非阻塞方式发送消息本质上是不可靠的。这是您为速度所做的基本权衡。 JMS 2 添加了 javax.jms.CompletionListener 以帮助稍微缓解这种情况,但任何 Camel JMS 组件都不太可能智能地使用它。