消息在执行完成后写入队列,生产者已停止
Messages are written to the queue after complete execution and the producer has stopped
我遇到了经纪人(ActiveMQ-Artemis 版本 2.17.0)对我而言异常的行为。
消息量大,厂商快速发送时,部分消息执行完毕到达队列,厂商停止。当硬盘正常,而不是SSD时,这一点尤其明显。
作为示例,我使用以下 Apache Camel 2.25.3 路由发送消息
<route autoStartup="false" factor:name="TCP SJMS2"
factor:trace="false" id="route-e098a2c8-efd4-41dd-9c1d-57937663cfbe">
<from id="endpoint-cef2b9db-e359-4fb0-aa4d-4afda4f79c10" uri="timer://init?delay=-1&repeatCount=200000">
</from>
<setBody factor:component="SetBodyEndpoint"
factor:custom-name="Установить тело сообщения"
factor:guid="endpoint-361ea09a-9e8a-4f44-a428-05e27dbdf3b5" id="endpoint-361ea09a-9e8a-4f44-a428-05e27dbdf3b5">
<simple><?xml version="1.0" encoding="utf-8"?>
<env:Envelope xmlns:env="http://www.w3.org/2003/05/soap-envelope">
2 kB message body
</env:Envelope></simple>
</setBody>
<to id="endpoint-546af4a0-ebe5-4479-91f0-f6b6609264cc" uri="local2amq://TCP.IN?connectionFactory=%23tcpArtemisCF">
</to>
</route>
<bean class="org.apache.camel.component.sjms2.Sjms2Component" id="local2amq">
<property name="connectionFactory" ref="artemisConnectionFactory"/>
<property name="connectionCount" value="5"/>
</bean>
<bean
class="org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory"
factor:bean-type="ARTEMIS_CONNECTION_FACTORY" id="tcpArtemisCF" name="tcpArtemisCF">
<property name="brokerURL" value="(tcp://localhost:61717)?blockOnDurableSend=false"/>
</bean>
这条路由很快,发送了 200,000 条消息,速度在 6,000 左右 s/s。
但是如果走完这条路,再去broker的队列,队列里只有80000条左右的消息,剩下的会以200-2000条的速度慢慢添加s/s
我在常规ActiveMQ中没有看到这样的行为,路由完成后,所有消息都在队列中。
主要问题。
这种行为是否常见且符合预期?它受什么参数调节?
如何查看已发送但尚未在队列中的消息数?
如何实现该行为以便在路由终止时将所有消息写入队列?
经纪人配置
<?xml version='1.0'?>
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>0.0.0.0</name>
<persistence-enabled>true</persistence-enabled>
<journal-type>NIO</journal-type>
<paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<journal-buffer-timeout>836000</journal-buffer-timeout>
<journal-max-io>1</journal-max-io>
<disk-scan-period>5000</disk-scan-period>
<max-disk-usage>100</max-disk-usage>
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>836000</page-sync-timeout>
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61717</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue" />
</anycast>
</address>
</addresses>
</core>
</configuration>
经纪人日志
17:58:19,887 INFO [org.apache.activemq.artemis.integration.bootstrap] AMQ101000: Starting ActiveMQ Artemis Server
2021-04-05 17:58:19,926 INFO [org.apache.activemq.artemis.core.server] AMQ221000: live Message Broker is starting with configuration Broker Configuration (clustered=false,journalDirectory=data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/large-messages,pagingDirectory=data/paging)
2021-04-05 17:58:19,958 INFO [org.apache.activemq.artemis.core.server] AMQ221013: Using NIO Journal
2021-04-05 17:58:20,038 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-server]. Adding protocol support for: CORE
2021-04-05 17:58:20,039 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-amqp-protocol]. Adding protocol support for: AMQP
2021-04-05 17:58:20,041 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-hornetq-protocol]. Adding protocol support for: HORNETQ
2021-04-05 17:58:20,047 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-mqtt-protocol]. Adding protocol support for: MQTT
2021-04-05 17:58:20,047 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-openwire-protocol]. Adding protocol support for: OPENWIRE
2021-04-05 17:58:20,048 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-stomp-protocol]. Adding protocol support for: STOMP
2021-04-05 17:58:20,163 INFO [org.apache.activemq.artemis.core.server] AMQ221034: Waiting indefinitely to obtain live lock
2021-04-05 17:58:20,163 INFO [org.apache.activemq.artemis.core.server] AMQ221035: Live Server Obtained live lock
2021-04-05 17:58:21,867 INFO [org.apache.activemq.artemis.core.server] AMQ221080: Deploying address DLQ supporting [ANYCAST]
2021-04-05 17:58:21,869 INFO [org.apache.activemq.artemis.core.server] AMQ221003: Deploying ANYCAST queue DLQ on address DLQ
2021-04-05 17:58:21,876 INFO [org.apache.activemq.artemis.core.server] AMQ221080: Deploying address ExpiryQueue supporting [ANYCAST]
2021-04-05 17:58:21,877 INFO [org.apache.activemq.artemis.core.server] AMQ221003: Deploying ANYCAST queue ExpiryQueue on address ExpiryQueue
2021-04-05 17:58:22,686 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started NIO Acceptor at 0.0.0.0:61717 for protocols [CORE,MQTT,AMQP,HORNETQ,STOMP,OPENWIRE]
2021-04-05 17:58:22,797 INFO [org.apache.activemq.artemis.core.server] AMQ221007: Server is now live
2021-04-05 17:58:22,798 INFO [org.apache.activemq.artemis.core.server] AMQ221001: Apache ActiveMQ Artemis Message Broker version 2.17.0 [0.0.0.0, nodeID=024cff0e-8ff2-11eb-8968-c0b6f9f8ba29]
2021-04-05 17:58:23,113 INFO [org.apache.activemq.hawtio.branding.PluginContextListener] Initialized activemq-branding plugin
2021-04-05 17:58:23,250 INFO [org.apache.activemq.hawtio.plugin.PluginContextListener] Initialized artemis-plugin plugin
2021-04-05 17:58:24,336 INFO [io.hawt.HawtioContextListener] Initialising hawtio services
2021-04-05 17:58:24,349 INFO [io.hawt.system.ConfigManager] Configuration will be discovered via system properties
2021-04-05 17:58:24,352 INFO [io.hawt.jmx.JmxTreeWatcher] Welcome to Hawtio 2.11.0
2021-04-05 17:58:24,359 INFO [io.hawt.web.auth.AuthenticationConfiguration] Starting hawtio authentication filter, JAAS realm: "activemq" authorized role(s): "amq" role principal classes: "org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal"
2021-04-05 17:58:24,378 INFO [io.hawt.web.proxy.ProxyServlet] Proxy servlet is disabled
2021-04-05 17:58:24,385 INFO [io.hawt.web.servlets.JolokiaConfiguredAgentServlet] Jolokia overridden property: [key=policyLocation, value=file:/D:/Documents/apache-artemis-2.17.0/bin/emptyNew/etc/\jolokia-access.xml]
2021-04-05 17:58:24,712 INFO [org.apache.activemq.artemis] AMQ241001: HTTP Server started at http://localhost:8161
2021-04-05 17:58:24,713 INFO [org.apache.activemq.artemis] AMQ241002: Artemis Jolokia REST API available at http://localhost:8161/console/jolokia
2021-04-05 17:58:24,714 INFO [org.apache.activemq.artemis] AMQ241004: Artemis Console available at http://localhost:8161/console
2021-04-05 17:59:08,763 INFO [io.hawt.web.auth.LoginServlet] Hawtio login is using 1800 sec. HttpSession timeout
2021-04-05 17:59:10,512 INFO [io.hawt.web.auth.LoginServlet] Logging in user: root
2021-04-05 17:59:11,206 INFO [io.hawt.web.auth.keycloak.KeycloakServlet] Keycloak integration is disabled
更新
常规 ActiveMQ 版本 5.15.11 的数据
骆驼路线
<route autoStartup="false" factor:name="TCP SJMS2"
factor:trace="false" id="route-e098a2c8-efd4-41dd-9c1d-57937663cfbe">
<from id="endpoint-cef2b9db-e359-4fb0-aa4d-4afda4f79c10" uri="timer://init?delay=-1&repeatCount=200000">
</from>
<setBody factor:component="SetBodyEndpoint"
factor:custom-name="Установить тело сообщения"
factor:guid="endpoint-361ea09a-9e8a-4f44-a428-05e27dbdf3b5" id="endpoint-361ea09a-9e8a-4f44-a428-05e27dbdf3b5">
<simple><?xml version="1.0" encoding="utf-8"?>
<env:Envelope xmlns:env="http://www.w3.org/2003/05/soap-envelope">
2 kB message body
</env:Envelope></simple>
</setBody>
<to id="endpoint-f697cad9-90db-47b9-877f-4189febdd010" uri="localmq://PERF.IN?connectionFactory=%23tcpActiveMQCF">
</to>
</route>
<bean class="org.apache.activemq.camel.component.ActiveMQComponent" id="localmq">
<property name="configuration" ref="jmsConfig"/>
</bean>
<bean class="org.apache.camel.component.jms.JmsConfiguration" id="jmsConfig">
<property name="asyncStartListener" value="true"/>
<property name="cacheLevelName" value="CACHE_CONSUMER"/>
<property name="preserveMessageQos" value="true"/>
</bean>
<bean class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop" factor:bean-type="AMQ_CONNECTION_FACTORY"
id="tcpActiveMQCF" init-method="start" name="tcpActiveMQCF">
<property name="maxConnections" value="1"/>
<property name="maximumActiveSessionPerConnection" value="15"/>
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory"
id="tcpActiveMQCF_connection" name="tcpActiveMQCF_connection">
<property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=false"/>
</bean>
</property>
</bean>
对于这条路线,我的速度约为 2500 s/s,并且消息会立即写入队列,而更改参数 jms.useAsyncSend
实际上不会影响我的性能。
这种行为在发送非持久消息时是预期的,因为非持久消息是以非阻塞方式发送的。目前尚不清楚您是否正在发送非持久消息,但您还在客户端的 URL 上设置了 blockOnDurableSend=false
,因此即使是持久消息也将以非阻塞方式发送。
从代理的角度来看,消息实际上并未到达,因此无法查看已发送但尚未进入队列的消息数量。
如果您想确保当 Camel 路由终止时所有消息都写入队列,那么您应该发送持久消息并设置 blockOnDurableSend=true
(这是默认值)。
请记住,阻塞 会根据您的硬盘速度降低性能(可能会大幅降低)。这是因为客户端必须等待代理发送的每条消息的响应,并且对于代理收到的每条消息,它必须将该消息保存到磁盘并等待硬盘同步,然后再发送响应返回给客户端。所以,如果你的硬盘不能快速同步,客户端相对来说要等很久。
影响此行为的配置参数之一是 journal-buffer-timeout
。该值是在首次创建代理实例时自动计算和设置的。您将看到此记录的证据,例如:
Auto tuning journal ...
done! Your system can make 250 writes per millisecond, your journal-buffer-timeout will be 4000
在您的情况下,journal-buffer-timeout
已设置为 836000
,这非常慢(超时时间越长,磁盘越慢)。这意味着您的磁盘每毫秒只能进行 1.2 次写入。如果您认为此值有误,您可以 运行 artemis perf-journal
命令重新计算并相应地更新配置。
给你一个比较,我的 journal-buffer-timeout
是 4000
,我可以 运行 使用 ActiveMQ Artemis 的 artemis producer --protocol amqp
命令,它将在更少的时间内发送 1,000 条持久消息几 运行 秒后超过 700 毫秒。如果我使用 --non-persistent
标志,持续时间会下降到大约 200 毫秒。
如果我对 ActiveMQ 5.16.0 的默认安装执行相同的测试,则分别需要大约 900 和 200 毫秒,考虑到测试的性质,这并不奇怪。
值得注意的是,ActiveMQ 5.x 具有 jms.useAsyncSend
参数,其功能等同于 Artemis 中的 blockOnDurableSend
和 blockOnNonDurableSend
。但是,如果您使用它,您不太可能看到如此大的差异,因为 5.x 代理有很多固有的内部阻塞,而 Artemis 是从头开始编写的,完全是非阻塞的。因此,Artemis 的潜在性能上限远高于 5.x,这也是 Artemis 存在的主要原因之一。
请记住,无论是否阻止,您实际上只是分别以可靠性换取速度。通过不阻塞,您是在告诉客户“解雇后忘记”。根据定义,客户端不知道消息是否真的被代理成功接收。换句话说,从客户端的角度来看,以非阻塞方式发送消息本质上是不可靠的。这是您为速度所做的基本权衡。 JMS 2 添加了 javax.jms.CompletionListener
以帮助稍微缓解这种情况,但任何 Camel JMS 组件都不太可能智能地使用它。
我遇到了经纪人(ActiveMQ-Artemis 版本 2.17.0)对我而言异常的行为。
消息量大,厂商快速发送时,部分消息执行完毕到达队列,厂商停止。当硬盘正常,而不是SSD时,这一点尤其明显。
作为示例,我使用以下 Apache Camel 2.25.3 路由发送消息
<route autoStartup="false" factor:name="TCP SJMS2"
factor:trace="false" id="route-e098a2c8-efd4-41dd-9c1d-57937663cfbe">
<from id="endpoint-cef2b9db-e359-4fb0-aa4d-4afda4f79c10" uri="timer://init?delay=-1&repeatCount=200000">
</from>
<setBody factor:component="SetBodyEndpoint"
factor:custom-name="Установить тело сообщения"
factor:guid="endpoint-361ea09a-9e8a-4f44-a428-05e27dbdf3b5" id="endpoint-361ea09a-9e8a-4f44-a428-05e27dbdf3b5">
<simple><?xml version="1.0" encoding="utf-8"?>
<env:Envelope xmlns:env="http://www.w3.org/2003/05/soap-envelope">
2 kB message body
</env:Envelope></simple>
</setBody>
<to id="endpoint-546af4a0-ebe5-4479-91f0-f6b6609264cc" uri="local2amq://TCP.IN?connectionFactory=%23tcpArtemisCF">
</to>
</route>
<bean class="org.apache.camel.component.sjms2.Sjms2Component" id="local2amq">
<property name="connectionFactory" ref="artemisConnectionFactory"/>
<property name="connectionCount" value="5"/>
</bean>
<bean
class="org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory"
factor:bean-type="ARTEMIS_CONNECTION_FACTORY" id="tcpArtemisCF" name="tcpArtemisCF">
<property name="brokerURL" value="(tcp://localhost:61717)?blockOnDurableSend=false"/>
</bean>
这条路由很快,发送了 200,000 条消息,速度在 6,000 左右 s/s。
但是如果走完这条路,再去broker的队列,队列里只有80000条左右的消息,剩下的会以200-2000条的速度慢慢添加s/s
我在常规ActiveMQ中没有看到这样的行为,路由完成后,所有消息都在队列中。
主要问题。
这种行为是否常见且符合预期?它受什么参数调节?
如何查看已发送但尚未在队列中的消息数?
如何实现该行为以便在路由终止时将所有消息写入队列?
经纪人配置
<?xml version='1.0'?>
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>0.0.0.0</name>
<persistence-enabled>true</persistence-enabled>
<journal-type>NIO</journal-type>
<paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<journal-buffer-timeout>836000</journal-buffer-timeout>
<journal-max-io>1</journal-max-io>
<disk-scan-period>5000</disk-scan-period>
<max-disk-usage>100</max-disk-usage>
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>836000</page-sync-timeout>
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61717</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue" />
</anycast>
</address>
</addresses>
</core>
</configuration>
经纪人日志
17:58:19,887 INFO [org.apache.activemq.artemis.integration.bootstrap] AMQ101000: Starting ActiveMQ Artemis Server
2021-04-05 17:58:19,926 INFO [org.apache.activemq.artemis.core.server] AMQ221000: live Message Broker is starting with configuration Broker Configuration (clustered=false,journalDirectory=data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/large-messages,pagingDirectory=data/paging)
2021-04-05 17:58:19,958 INFO [org.apache.activemq.artemis.core.server] AMQ221013: Using NIO Journal
2021-04-05 17:58:20,038 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-server]. Adding protocol support for: CORE
2021-04-05 17:58:20,039 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-amqp-protocol]. Adding protocol support for: AMQP
2021-04-05 17:58:20,041 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-hornetq-protocol]. Adding protocol support for: HORNETQ
2021-04-05 17:58:20,047 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-mqtt-protocol]. Adding protocol support for: MQTT
2021-04-05 17:58:20,047 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-openwire-protocol]. Adding protocol support for: OPENWIRE
2021-04-05 17:58:20,048 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-stomp-protocol]. Adding protocol support for: STOMP
2021-04-05 17:58:20,163 INFO [org.apache.activemq.artemis.core.server] AMQ221034: Waiting indefinitely to obtain live lock
2021-04-05 17:58:20,163 INFO [org.apache.activemq.artemis.core.server] AMQ221035: Live Server Obtained live lock
2021-04-05 17:58:21,867 INFO [org.apache.activemq.artemis.core.server] AMQ221080: Deploying address DLQ supporting [ANYCAST]
2021-04-05 17:58:21,869 INFO [org.apache.activemq.artemis.core.server] AMQ221003: Deploying ANYCAST queue DLQ on address DLQ
2021-04-05 17:58:21,876 INFO [org.apache.activemq.artemis.core.server] AMQ221080: Deploying address ExpiryQueue supporting [ANYCAST]
2021-04-05 17:58:21,877 INFO [org.apache.activemq.artemis.core.server] AMQ221003: Deploying ANYCAST queue ExpiryQueue on address ExpiryQueue
2021-04-05 17:58:22,686 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started NIO Acceptor at 0.0.0.0:61717 for protocols [CORE,MQTT,AMQP,HORNETQ,STOMP,OPENWIRE]
2021-04-05 17:58:22,797 INFO [org.apache.activemq.artemis.core.server] AMQ221007: Server is now live
2021-04-05 17:58:22,798 INFO [org.apache.activemq.artemis.core.server] AMQ221001: Apache ActiveMQ Artemis Message Broker version 2.17.0 [0.0.0.0, nodeID=024cff0e-8ff2-11eb-8968-c0b6f9f8ba29]
2021-04-05 17:58:23,113 INFO [org.apache.activemq.hawtio.branding.PluginContextListener] Initialized activemq-branding plugin
2021-04-05 17:58:23,250 INFO [org.apache.activemq.hawtio.plugin.PluginContextListener] Initialized artemis-plugin plugin
2021-04-05 17:58:24,336 INFO [io.hawt.HawtioContextListener] Initialising hawtio services
2021-04-05 17:58:24,349 INFO [io.hawt.system.ConfigManager] Configuration will be discovered via system properties
2021-04-05 17:58:24,352 INFO [io.hawt.jmx.JmxTreeWatcher] Welcome to Hawtio 2.11.0
2021-04-05 17:58:24,359 INFO [io.hawt.web.auth.AuthenticationConfiguration] Starting hawtio authentication filter, JAAS realm: "activemq" authorized role(s): "amq" role principal classes: "org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal"
2021-04-05 17:58:24,378 INFO [io.hawt.web.proxy.ProxyServlet] Proxy servlet is disabled
2021-04-05 17:58:24,385 INFO [io.hawt.web.servlets.JolokiaConfiguredAgentServlet] Jolokia overridden property: [key=policyLocation, value=file:/D:/Documents/apache-artemis-2.17.0/bin/emptyNew/etc/\jolokia-access.xml]
2021-04-05 17:58:24,712 INFO [org.apache.activemq.artemis] AMQ241001: HTTP Server started at http://localhost:8161
2021-04-05 17:58:24,713 INFO [org.apache.activemq.artemis] AMQ241002: Artemis Jolokia REST API available at http://localhost:8161/console/jolokia
2021-04-05 17:58:24,714 INFO [org.apache.activemq.artemis] AMQ241004: Artemis Console available at http://localhost:8161/console
2021-04-05 17:59:08,763 INFO [io.hawt.web.auth.LoginServlet] Hawtio login is using 1800 sec. HttpSession timeout
2021-04-05 17:59:10,512 INFO [io.hawt.web.auth.LoginServlet] Logging in user: root
2021-04-05 17:59:11,206 INFO [io.hawt.web.auth.keycloak.KeycloakServlet] Keycloak integration is disabled
更新
常规 ActiveMQ 版本 5.15.11 的数据
骆驼路线
<route autoStartup="false" factor:name="TCP SJMS2"
factor:trace="false" id="route-e098a2c8-efd4-41dd-9c1d-57937663cfbe">
<from id="endpoint-cef2b9db-e359-4fb0-aa4d-4afda4f79c10" uri="timer://init?delay=-1&repeatCount=200000">
</from>
<setBody factor:component="SetBodyEndpoint"
factor:custom-name="Установить тело сообщения"
factor:guid="endpoint-361ea09a-9e8a-4f44-a428-05e27dbdf3b5" id="endpoint-361ea09a-9e8a-4f44-a428-05e27dbdf3b5">
<simple><?xml version="1.0" encoding="utf-8"?>
<env:Envelope xmlns:env="http://www.w3.org/2003/05/soap-envelope">
2 kB message body
</env:Envelope></simple>
</setBody>
<to id="endpoint-f697cad9-90db-47b9-877f-4189febdd010" uri="localmq://PERF.IN?connectionFactory=%23tcpActiveMQCF">
</to>
</route>
<bean class="org.apache.activemq.camel.component.ActiveMQComponent" id="localmq">
<property name="configuration" ref="jmsConfig"/>
</bean>
<bean class="org.apache.camel.component.jms.JmsConfiguration" id="jmsConfig">
<property name="asyncStartListener" value="true"/>
<property name="cacheLevelName" value="CACHE_CONSUMER"/>
<property name="preserveMessageQos" value="true"/>
</bean>
<bean class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop" factor:bean-type="AMQ_CONNECTION_FACTORY"
id="tcpActiveMQCF" init-method="start" name="tcpActiveMQCF">
<property name="maxConnections" value="1"/>
<property name="maximumActiveSessionPerConnection" value="15"/>
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory"
id="tcpActiveMQCF_connection" name="tcpActiveMQCF_connection">
<property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=false"/>
</bean>
</property>
</bean>
对于这条路线,我的速度约为 2500 s/s,并且消息会立即写入队列,而更改参数 jms.useAsyncSend
实际上不会影响我的性能。
这种行为在发送非持久消息时是预期的,因为非持久消息是以非阻塞方式发送的。目前尚不清楚您是否正在发送非持久消息,但您还在客户端的 URL 上设置了 blockOnDurableSend=false
,因此即使是持久消息也将以非阻塞方式发送。
从代理的角度来看,消息实际上并未到达,因此无法查看已发送但尚未进入队列的消息数量。
如果您想确保当 Camel 路由终止时所有消息都写入队列,那么您应该发送持久消息并设置 blockOnDurableSend=true
(这是默认值)。
请记住,阻塞 会根据您的硬盘速度降低性能(可能会大幅降低)。这是因为客户端必须等待代理发送的每条消息的响应,并且对于代理收到的每条消息,它必须将该消息保存到磁盘并等待硬盘同步,然后再发送响应返回给客户端。所以,如果你的硬盘不能快速同步,客户端相对来说要等很久。
影响此行为的配置参数之一是 journal-buffer-timeout
。该值是在首次创建代理实例时自动计算和设置的。您将看到此记录的证据,例如:
Auto tuning journal ...
done! Your system can make 250 writes per millisecond, your journal-buffer-timeout will be 4000
在您的情况下,journal-buffer-timeout
已设置为 836000
,这非常慢(超时时间越长,磁盘越慢)。这意味着您的磁盘每毫秒只能进行 1.2 次写入。如果您认为此值有误,您可以 运行 artemis perf-journal
命令重新计算并相应地更新配置。
给你一个比较,我的 journal-buffer-timeout
是 4000
,我可以 运行 使用 ActiveMQ Artemis 的 artemis producer --protocol amqp
命令,它将在更少的时间内发送 1,000 条持久消息几 运行 秒后超过 700 毫秒。如果我使用 --non-persistent
标志,持续时间会下降到大约 200 毫秒。
如果我对 ActiveMQ 5.16.0 的默认安装执行相同的测试,则分别需要大约 900 和 200 毫秒,考虑到测试的性质,这并不奇怪。
值得注意的是,ActiveMQ 5.x 具有 jms.useAsyncSend
参数,其功能等同于 Artemis 中的 blockOnDurableSend
和 blockOnNonDurableSend
。但是,如果您使用它,您不太可能看到如此大的差异,因为 5.x 代理有很多固有的内部阻塞,而 Artemis 是从头开始编写的,完全是非阻塞的。因此,Artemis 的潜在性能上限远高于 5.x,这也是 Artemis 存在的主要原因之一。
请记住,无论是否阻止,您实际上只是分别以可靠性换取速度。通过不阻塞,您是在告诉客户“解雇后忘记”。根据定义,客户端不知道消息是否真的被代理成功接收。换句话说,从客户端的角度来看,以非阻塞方式发送消息本质上是不可靠的。这是您为速度所做的基本权衡。 JMS 2 添加了 javax.jms.CompletionListener
以帮助稍微缓解这种情况,但任何 Camel JMS 组件都不太可能智能地使用它。