正确配置 ActiveMQ 以避免 Producer 内存泄漏
Configuring ActiveMQ properly to avoid Producer memory leaks
我不是ActiveMQ专家,但我已经尝试在互联网上搜索了很多类似的问题,但我仍然很困惑。我有以下问题。
运行 Tomcat 8.x、Java 8、Spring Framework 4.3.18 中的 Web 应用程序。
我的 Web 应用程序使用 org.apache.activemq:activemq-spring:5.11.0
依赖项使用 ActiveMQ 发送和接收消息。
我正在以这种方式设置 ActiveMQ 连接工厂:
<amq:connectionFactory id="amqJmsFactory" brokerURL="${jms.broker.url}" />
<bean id="jmsConnectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory" ref="amqJmsFactory" />
<property name="maxConnections" value="2" />
<property name="idleTimeout" value="60000" />
<property name="timeBetweenExpirationCheckMillis" value="600000" />
<property name="maximumActiveSessionPerConnection" value="10" />
</bean>
最后的属性(maximumActiveSessionPerConnection
)已经被设置来尝试解决下面的问题(默认好像是500,恕我直言,这个值相当高),但我不确定它真的很有帮助,因为我仍然遇到 OutOfMemory 错误。
此连接工厂被侦听器容器工厂引用:
<jms:listener-container factory-id="activationJmsListenerContainerFactory"
container-type="default" connection-factory="jmsConnectionFactory"
concurrency="1" transaction-manager="centralTransactionManager">
</jms:listener-container>
一个 Spring 集成 4.3.17 入站适配器:
<int-jms:message-driven-channel-adapter id="invoiceEventJmsInboundChannelAdapter"
channel="incomingInvoiceEventJmsChannel"
connection-factory="jmsConnectionFactory"
destination-name="incomingEvent"
max-concurrent-consumers="2"
transaction-manager="customerTransactionManager"
error-channel="unexpectedErrorChannel" />
并通过两个出站适配器:
<int-jms:outbound-channel-adapter id="invoiceEventJmsOutboundChannelAdapter"
channel="outgoingInvoiceEventJmsChannel" destination-name="outgoingEvent"
connection-factory="jmsConnectionFactory" explicit-qos-enabled="true" delivery-persistent="true"
session-transacted="true" />
<int-jms:outbound-channel-adapter
id="passwordResetTokenSubmitterJmsOutboundChannelAdapter"
channel="passwordResetTokenSubmitterJmsChannel"
destination-name="passwordReset"
connection-factory="jmsConnectionFactory" explicit-qos-enabled="true"
delivery-persistent="false" session-transacted="false" />
一切正常,但我观察到 ActiveMQ 作为消息生成器(针对 invoiceEventJmsOutboundChannelAdapter
适配器)正在内存中累积大量对象,这导致我的应用程序出现 OutOfMemory 错误。我的消息可能有几 KB 长,因为它们的有效负载是 XML 个文件,但我不希望长时间保留这么多内存。
以下是我在最近的 OutOfMemory 错误(使用 Eclipse MAT 进行调查)时生成的堆转储的发现。发现了两个泄漏嫌疑人,都导致 ConnectionStateTracker
.
这是两个累加器之一:
Class Name | Shallow Heap | Retained Heap
-------------------------------------------------------------------------------------------------------------------------------------------
java.util.concurrent.ConcurrentHashMap$HashEntry[4] @ 0xe295da78 | 32 | 58.160.312
'- table java.util.concurrent.ConcurrentHashMap$Segment @ 0xe295da30 | 40 | 58.160.384
'- [15] java.util.concurrent.ConcurrentHashMap$Segment[16] @ 0xe295d9e0 | 80 | 68.573.384
'- segments java.util.concurrent.ConcurrentHashMap @ 0xe295d9b0 | 48 | 68.573.432
'- sessions org.apache.activemq.state.ConnectionState @ 0xe295d7e0 | 40 | 68.575.312
'- value java.util.concurrent.ConcurrentHashMap$HashEntry @ 0xe295d728 | 32 | 68.575.344
'- [1] java.util.concurrent.ConcurrentHashMap$HashEntry[2] @ 0xe295d710 | 24 | 68.575.368
'- table java.util.concurrent.ConcurrentHashMap$Segment @ 0xe295d6c8 | 40 | 68.575.440
'- [12] java.util.concurrent.ConcurrentHashMap$Segment[16] @ 0xe295d678 | 80 | 68.575.616
'- segments java.util.concurrent.ConcurrentHashMap @ 0xe295d648 | 48 | 68.575.664
'- connectionStates org.apache.activemq.state.ConnectionStateTracker @ 0xe295d620| 40 | 68.575.808
-------------------------------------------------------------------------------------------------------------------------------------------
如您所见,ConnectionStateTracker
的一个实例保留了大约 70 MB 的堆 space。有两个 ConnectionStateTracker
实例(我猜是每个出站适配器一个),它们总共保留了大约 120 MB 的堆。他们在 ConnectionState
的两个实例中累积它,这又具有一个 "sessions" 的映射,其中包含总共 10 个 SessionState
个实例,而这些实例又具有 ConcurrentHashMap
个生产者总共持有 1,258 ProducerState
个实例。它们在它们的 transactionState
字段中保留了那 120 MB 的堆,该字段的类型为 TransactionState
,而后者又具有一个 commands
ArrayList
,它似乎保留了整个消息我发了。
我的问题是:为什么 ActiveMQ 会在内存中保存已经发出的消息?将所有这些消息保存在内存中也存在一些安全问题。
我们在客户的站点遇到了同样的问题。
查看 ActiveMQ 代码,ConnectionStateTracker 中有一个 class RemoveTransactionAction,它删除条目以响应 OpenWire RemoveInfo(类型 12)消息。此消息似乎是 ActiveMQ 收到消息后生成的。
这是我最终解决这个问题的方法。
我认为这里的主要问题是 bug AMQ-6603。所以,我们做的第一件事就是升级到 ActiveMQ 5.15.8。我认为这足以修复泄漏。
但是,在发现不鼓励将池连接工厂与侦听器容器工厂一起使用后,我们也稍微更改了我们的配置。我认为 ActiveMQ 文档令人困惑,正确的 JMS 配置比它应该的更复杂。无论如何,如果您阅读 DefaultMessageListenerContainer
文档,您将阅读:
Don't use Spring's org.springframework.jms.connection.CachingConnectionFactory
in combination with dynamic scaling. Ideally, don't use it with a message listener container at all, since it is generally preferable to let the listener container itself handle appropriate caching within its lifecycle. Also, stopping and restarting a listener container will only work with an independent, locally cached Connection - not with an externally cached one.
这同样适用于 ActiveMQ PooledConnectionFactory
。但是,ActiveMQ 文档说:
Spring's MessagListenerContainer
should be used for message consumption. This provides all the power of MDBs - efficient JMS consumption and pooling of the message listeners - but without requiring a full EJB container.
You can use the activemq-pool org.apache.activemq.pool.PooledConnectionFactory
for efficient pooling of the connections and sessions for your collection of consumers, or you can use the Spring JMS org.springframework.jms.connection.CachingConnectionFactory
to achieve the same effect.
因此,ActiveMQ 文档提出了相反的建议。我为此打开了bug AMQ-7140。出于这个原因,我现在只向 JMS clients 注入 PooledConnectionFactory
(或 Spring CachingConnectionFactory
),而普通的非缓存在使用 Spring <jms:listener-container>
或 Spring Integration <int-jms:message-driven-channel-adapter>
构建侦听器容器工厂时,ActiveMQ 连接工厂(使用 <amq:connectionFactory>
构建),我宁愿将它们的属性设置为并发和缓存级别。
额外的困难是我们将本地构建的事务管理器传递给侦听器容器工厂,以便将 JMS 消息提交与数据库提交同步。这确实会导致侦听器容器工厂在默认情况下完全禁用其连接缓存机制,除非您还设置了显式缓存级别(请参阅 org.springframework.jms.listener.DefaultMessageListenerContainer.setCacheLevel(int)
Javadoc)。
我通过说很难找到解决方案来结束这个回答,特别是因为我没有从任何 ActiveMQ 开发人员那里得到任何反馈,无论是在 mailing list 还是在问题跟踪器上,即使恕我直言,这可能被视为一个安全问题。这种不活跃,加上缺乏替代方案,让我考虑是否仍应在我的下一个项目中考虑 JMS。
TL;DR
在您的 PooledConnectionFactory
上禁用 anonymousProducers
。
当使用 JMS t运行saction 和配置最多 8 个连接的 PooledConnectionFactory
时,我们遇到了一个非常相似的服务 运行 内存不足的问题。
但是,我们没有使用 DefaultMessageListenerContainer
或 Spring,并且只发送了一个制作人。
这个生产者负责从批处理作业中发送大量消息,我们发现当连接失败时,它会将这些消息留在附加到旧连接的 ConnectionStateTracker
上。在多次故障转移之后,这些消息会在旧连接上累积,直到我们 运行 超出堆。
似乎从内存中清除这些消息的唯一方法是在提交 JMS t运行saction 后关闭生产者。这将从 ConnectionStateTracker
上的 SessionState
中删除 ProducerState
实例。
SimonD 在他的回答中提到的对 RemoveTransactionAction
的调用(在提交 JMS t运行saction 后自动发生)只是从 ConnectionState
中删除了 TransactionState
,但仍将生产者及其消息留在 SessionState
对象上。
不幸的是 在生产者上调用 close()
不能立即使用 PooledConnectionFactory
,因为默认情况下它使用匿名生产者 - 调用close()
方法对匿名生产者无效。您必须先在 PooledConnectionFactory
上调用 setUseAnonymousProducers(false)
才能关闭生产者才能生效。
还值得指出的是,您必须在生产者上调用 close()
- 在会话上调用 close()
不会导致生产者关闭,尽管 JavaDoc 为 ActiveMQSession
建议。相反,它调用生产者的 dispose()
方法。
我不是ActiveMQ专家,但我已经尝试在互联网上搜索了很多类似的问题,但我仍然很困惑。我有以下问题。
运行 Tomcat 8.x、Java 8、Spring Framework 4.3.18 中的 Web 应用程序。
我的 Web 应用程序使用 org.apache.activemq:activemq-spring:5.11.0
依赖项使用 ActiveMQ 发送和接收消息。
我正在以这种方式设置 ActiveMQ 连接工厂:
<amq:connectionFactory id="amqJmsFactory" brokerURL="${jms.broker.url}" />
<bean id="jmsConnectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory" ref="amqJmsFactory" />
<property name="maxConnections" value="2" />
<property name="idleTimeout" value="60000" />
<property name="timeBetweenExpirationCheckMillis" value="600000" />
<property name="maximumActiveSessionPerConnection" value="10" />
</bean>
最后的属性(maximumActiveSessionPerConnection
)已经被设置来尝试解决下面的问题(默认好像是500,恕我直言,这个值相当高),但我不确定它真的很有帮助,因为我仍然遇到 OutOfMemory 错误。
此连接工厂被侦听器容器工厂引用:
<jms:listener-container factory-id="activationJmsListenerContainerFactory"
container-type="default" connection-factory="jmsConnectionFactory"
concurrency="1" transaction-manager="centralTransactionManager">
</jms:listener-container>
一个 Spring 集成 4.3.17 入站适配器:
<int-jms:message-driven-channel-adapter id="invoiceEventJmsInboundChannelAdapter"
channel="incomingInvoiceEventJmsChannel"
connection-factory="jmsConnectionFactory"
destination-name="incomingEvent"
max-concurrent-consumers="2"
transaction-manager="customerTransactionManager"
error-channel="unexpectedErrorChannel" />
并通过两个出站适配器:
<int-jms:outbound-channel-adapter id="invoiceEventJmsOutboundChannelAdapter"
channel="outgoingInvoiceEventJmsChannel" destination-name="outgoingEvent"
connection-factory="jmsConnectionFactory" explicit-qos-enabled="true" delivery-persistent="true"
session-transacted="true" />
<int-jms:outbound-channel-adapter
id="passwordResetTokenSubmitterJmsOutboundChannelAdapter"
channel="passwordResetTokenSubmitterJmsChannel"
destination-name="passwordReset"
connection-factory="jmsConnectionFactory" explicit-qos-enabled="true"
delivery-persistent="false" session-transacted="false" />
一切正常,但我观察到 ActiveMQ 作为消息生成器(针对 invoiceEventJmsOutboundChannelAdapter
适配器)正在内存中累积大量对象,这导致我的应用程序出现 OutOfMemory 错误。我的消息可能有几 KB 长,因为它们的有效负载是 XML 个文件,但我不希望长时间保留这么多内存。
以下是我在最近的 OutOfMemory 错误(使用 Eclipse MAT 进行调查)时生成的堆转储的发现。发现了两个泄漏嫌疑人,都导致 ConnectionStateTracker
.
这是两个累加器之一:
Class Name | Shallow Heap | Retained Heap
-------------------------------------------------------------------------------------------------------------------------------------------
java.util.concurrent.ConcurrentHashMap$HashEntry[4] @ 0xe295da78 | 32 | 58.160.312
'- table java.util.concurrent.ConcurrentHashMap$Segment @ 0xe295da30 | 40 | 58.160.384
'- [15] java.util.concurrent.ConcurrentHashMap$Segment[16] @ 0xe295d9e0 | 80 | 68.573.384
'- segments java.util.concurrent.ConcurrentHashMap @ 0xe295d9b0 | 48 | 68.573.432
'- sessions org.apache.activemq.state.ConnectionState @ 0xe295d7e0 | 40 | 68.575.312
'- value java.util.concurrent.ConcurrentHashMap$HashEntry @ 0xe295d728 | 32 | 68.575.344
'- [1] java.util.concurrent.ConcurrentHashMap$HashEntry[2] @ 0xe295d710 | 24 | 68.575.368
'- table java.util.concurrent.ConcurrentHashMap$Segment @ 0xe295d6c8 | 40 | 68.575.440
'- [12] java.util.concurrent.ConcurrentHashMap$Segment[16] @ 0xe295d678 | 80 | 68.575.616
'- segments java.util.concurrent.ConcurrentHashMap @ 0xe295d648 | 48 | 68.575.664
'- connectionStates org.apache.activemq.state.ConnectionStateTracker @ 0xe295d620| 40 | 68.575.808
-------------------------------------------------------------------------------------------------------------------------------------------
如您所见,ConnectionStateTracker
的一个实例保留了大约 70 MB 的堆 space。有两个 ConnectionStateTracker
实例(我猜是每个出站适配器一个),它们总共保留了大约 120 MB 的堆。他们在 ConnectionState
的两个实例中累积它,这又具有一个 "sessions" 的映射,其中包含总共 10 个 SessionState
个实例,而这些实例又具有 ConcurrentHashMap
个生产者总共持有 1,258 ProducerState
个实例。它们在它们的 transactionState
字段中保留了那 120 MB 的堆,该字段的类型为 TransactionState
,而后者又具有一个 commands
ArrayList
,它似乎保留了整个消息我发了。
我的问题是:为什么 ActiveMQ 会在内存中保存已经发出的消息?将所有这些消息保存在内存中也存在一些安全问题。
我们在客户的站点遇到了同样的问题。
查看 ActiveMQ 代码,ConnectionStateTracker 中有一个 class RemoveTransactionAction,它删除条目以响应 OpenWire RemoveInfo(类型 12)消息。此消息似乎是 ActiveMQ 收到消息后生成的。
这是我最终解决这个问题的方法。
我认为这里的主要问题是 bug AMQ-6603。所以,我们做的第一件事就是升级到 ActiveMQ 5.15.8。我认为这足以修复泄漏。
但是,在发现不鼓励将池连接工厂与侦听器容器工厂一起使用后,我们也稍微更改了我们的配置。我认为 ActiveMQ 文档令人困惑,正确的 JMS 配置比它应该的更复杂。无论如何,如果您阅读 DefaultMessageListenerContainer
文档,您将阅读:
Don't use Spring's
org.springframework.jms.connection.CachingConnectionFactory
in combination with dynamic scaling. Ideally, don't use it with a message listener container at all, since it is generally preferable to let the listener container itself handle appropriate caching within its lifecycle. Also, stopping and restarting a listener container will only work with an independent, locally cached Connection - not with an externally cached one.
这同样适用于 ActiveMQ PooledConnectionFactory
。但是,ActiveMQ 文档说:
Spring's
MessagListenerContainer
should be used for message consumption. This provides all the power of MDBs - efficient JMS consumption and pooling of the message listeners - but without requiring a full EJB container.You can use the activemq-pool
org.apache.activemq.pool.PooledConnectionFactory
for efficient pooling of the connections and sessions for your collection of consumers, or you can use the Spring JMSorg.springframework.jms.connection.CachingConnectionFactory
to achieve the same effect.
因此,ActiveMQ 文档提出了相反的建议。我为此打开了bug AMQ-7140。出于这个原因,我现在只向 JMS clients 注入 PooledConnectionFactory
(或 Spring CachingConnectionFactory
),而普通的非缓存在使用 Spring <jms:listener-container>
或 Spring Integration <int-jms:message-driven-channel-adapter>
构建侦听器容器工厂时,ActiveMQ 连接工厂(使用 <amq:connectionFactory>
构建),我宁愿将它们的属性设置为并发和缓存级别。
额外的困难是我们将本地构建的事务管理器传递给侦听器容器工厂,以便将 JMS 消息提交与数据库提交同步。这确实会导致侦听器容器工厂在默认情况下完全禁用其连接缓存机制,除非您还设置了显式缓存级别(请参阅 org.springframework.jms.listener.DefaultMessageListenerContainer.setCacheLevel(int)
Javadoc)。
我通过说很难找到解决方案来结束这个回答,特别是因为我没有从任何 ActiveMQ 开发人员那里得到任何反馈,无论是在 mailing list 还是在问题跟踪器上,即使恕我直言,这可能被视为一个安全问题。这种不活跃,加上缺乏替代方案,让我考虑是否仍应在我的下一个项目中考虑 JMS。
TL;DR
在您的 PooledConnectionFactory
上禁用 anonymousProducers
。
当使用 JMS t运行saction 和配置最多 8 个连接的 PooledConnectionFactory
时,我们遇到了一个非常相似的服务 运行 内存不足的问题。
但是,我们没有使用 DefaultMessageListenerContainer
或 Spring,并且只发送了一个制作人。
这个生产者负责从批处理作业中发送大量消息,我们发现当连接失败时,它会将这些消息留在附加到旧连接的 ConnectionStateTracker
上。在多次故障转移之后,这些消息会在旧连接上累积,直到我们 运行 超出堆。
似乎从内存中清除这些消息的唯一方法是在提交 JMS t运行saction 后关闭生产者。这将从 ConnectionStateTracker
上的 SessionState
中删除 ProducerState
实例。
SimonD 在他的回答中提到的对 RemoveTransactionAction
的调用(在提交 JMS t运行saction 后自动发生)只是从 ConnectionState
中删除了 TransactionState
,但仍将生产者及其消息留在 SessionState
对象上。
不幸的是 在生产者上调用 close()
不能立即使用 PooledConnectionFactory
,因为默认情况下它使用匿名生产者 - 调用close()
方法对匿名生产者无效。您必须先在 PooledConnectionFactory
上调用 setUseAnonymousProducers(false)
才能关闭生产者才能生效。
还值得指出的是,您必须在生产者上调用 close()
- 在会话上调用 close()
不会导致生产者关闭,尽管 JavaDoc 为 ActiveMQSession
建议。相反,它调用生产者的 dispose()
方法。