限制将持久消息批量发送到队列的 ActiveMQ 生产者的正确方法是什么?
What is the correct way to throttle ActiveMQ producers who send persistent messages in batches to a queue?
我有一个生产者,它以 批次 的形式将持久消息发送到利用 JMS 事务的队列。
我已经测试并发现 Producer Flow Control 在使用批处理大小为 1 时应用。我可以看到我的生产者根据我配置的内存限制受到限制队列。这是我的生产者流控制配置:
<policyEntry queue="foo" optimizedDispatch="true"
producerFlowControl="true" memoryLimit="1mb">
</policyEntry>
队列中待处理消息的数量是受控的,我认为这是生产者流控制在行动的证据。
但是,当批处理大小增加到 2 时,我发现这个内存限制没有得到遵守,生产者根本没有节流。证据是队列中待处理消息的数量继续增加,直到达到配置的 storeUsage 限制。
我理解这可能是因为当批处理大小大于 1 时消息以异步方式发送,即使我没有明确地将 useAsyncSend 设置为 真.
ActiveMQ 的 Producer Flow Control documentation 提到要限制异步发布者,我们需要在生产者中配置 Producer Window Size,这将强制 Producer 等待达到 window 限制后确认。
但是,当我在生产者中配置 Producer Window Size 并尝试批量发送消息时,抛出异常并且没有发送任何消息。
这让我思考并提出这个问题,"Is it possible to configure Producer Window Size while sending persistent messages in batches?"。
如果不是,那么限制以 批次 发送持久消息的生产者的正确方法是什么?
没有真正的方法来限制 "max msgs per second" 或类似的。您要做的是启用生产者流控制和 vm 游标,然后将该队列(如果您愿意,也可能是所有队列)的内存限制设置为某个合理的水平。
您可以在配置中决定如果达到队列内存限制,生产者应该挂起还是抛出异常。
<policyEntry queue="MY.BATCH.QUEUE" memoryLimit="100mb" producerFlowControl="true">
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
</policyEntry>
我在 v5.8.0 中发现了这个问题,但发现这个问题在 v5.9.0 及更高版本中得到了解决。
从 v5.9.0 开始,我发现 PFC 开箱即用,即使对于异步发送消息的生产者也是如此。
由于批量发送(其中批量大小 > 1)本质上是一个异步操作,这也适用于此。
但是 PFC wiki 令人困惑,因为它提到如果要应用 PFC,应该为异步生产者配置 ProducerWindowSize。但是,我测试并验证不需要这样做。
我基本上配置了 1mb 的每个目的地限制并分批发送消息(批大小为 100)。
我的制作人在没有任何额外配置的情况下开箱即用。队列中待处理消息的数量没有增加并且在控制之中。
通过一个简单的 Camel 消费者使用消息(并将它们附加到一个文件),我发现使用 v5.8.0(我遇到的问题),我可以在 36 秒内发送 100k 消息,有效负载为 2k .但其中大部分最终都变成了待处理消息。
但在 v5.9.0 中,发送同一组消息需要 176 秒,证明 PFC 所起的作用。在我的案例中,待处理消息的数量从未超过 1000。
我还使用 v5.10.0 和 v5.12.0(撰写本文时的最新版本)进行了测试,它们按预期工作。
因此,如果您遇到此问题,很可能您使用的是 运行 ActiveMQ v5.8.0 或更早版本。只需升级到最新版本即可解决此问题。
我感谢非常有帮助的 ActiveMQ 邮件列表人员提供的所有建议和帮助。
也感谢@Petter 的回答。抱歉,我没有在问题中提及我使用的版本,否则我相信您会立即发现问题。
我有一个生产者,它以 批次 的形式将持久消息发送到利用 JMS 事务的队列。
我已经测试并发现 Producer Flow Control 在使用批处理大小为 1 时应用。我可以看到我的生产者根据我配置的内存限制受到限制队列。这是我的生产者流控制配置:
<policyEntry queue="foo" optimizedDispatch="true"
producerFlowControl="true" memoryLimit="1mb">
</policyEntry>
队列中待处理消息的数量是受控的,我认为这是生产者流控制在行动的证据。
但是,当批处理大小增加到 2 时,我发现这个内存限制没有得到遵守,生产者根本没有节流。证据是队列中待处理消息的数量继续增加,直到达到配置的 storeUsage 限制。
我理解这可能是因为当批处理大小大于 1 时消息以异步方式发送,即使我没有明确地将 useAsyncSend 设置为 真.
ActiveMQ 的 Producer Flow Control documentation 提到要限制异步发布者,我们需要在生产者中配置 Producer Window Size,这将强制 Producer 等待达到 window 限制后确认。
但是,当我在生产者中配置 Producer Window Size 并尝试批量发送消息时,抛出异常并且没有发送任何消息。
这让我思考并提出这个问题,"Is it possible to configure Producer Window Size while sending persistent messages in batches?"。
如果不是,那么限制以 批次 发送持久消息的生产者的正确方法是什么?
没有真正的方法来限制 "max msgs per second" 或类似的。您要做的是启用生产者流控制和 vm 游标,然后将该队列(如果您愿意,也可能是所有队列)的内存限制设置为某个合理的水平。
您可以在配置中决定如果达到队列内存限制,生产者应该挂起还是抛出异常。
<policyEntry queue="MY.BATCH.QUEUE" memoryLimit="100mb" producerFlowControl="true">
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
</policyEntry>
我在 v5.8.0 中发现了这个问题,但发现这个问题在 v5.9.0 及更高版本中得到了解决。 从 v5.9.0 开始,我发现 PFC 开箱即用,即使对于异步发送消息的生产者也是如此。 由于批量发送(其中批量大小 > 1)本质上是一个异步操作,这也适用于此。 但是 PFC wiki 令人困惑,因为它提到如果要应用 PFC,应该为异步生产者配置 ProducerWindowSize。但是,我测试并验证不需要这样做。 我基本上配置了 1mb 的每个目的地限制并分批发送消息(批大小为 100)。
我的制作人在没有任何额外配置的情况下开箱即用。队列中待处理消息的数量没有增加并且在控制之中。
通过一个简单的 Camel 消费者使用消息(并将它们附加到一个文件),我发现使用 v5.8.0(我遇到的问题),我可以在 36 秒内发送 100k 消息,有效负载为 2k .但其中大部分最终都变成了待处理消息。
但在 v5.9.0 中,发送同一组消息需要 176 秒,证明 PFC 所起的作用。在我的案例中,待处理消息的数量从未超过 1000。
我还使用 v5.10.0 和 v5.12.0(撰写本文时的最新版本)进行了测试,它们按预期工作。
因此,如果您遇到此问题,很可能您使用的是 运行 ActiveMQ v5.8.0 或更早版本。只需升级到最新版本即可解决此问题。
我感谢非常有帮助的 ActiveMQ 邮件列表人员提供的所有建议和帮助。
也感谢@Petter 的回答。抱歉,我没有在问题中提及我使用的版本,否则我相信您会立即发现问题。