JMS 消费者阻止其他 JMSXGroups
JMS Consumer blocking other JMSXGroups
我正在尝试弄清楚当消息回滚到队列时是否可以影响消费者的消息处理顺序。
我在下面有一些简单的代码可以帮助我重现该问题。
我只是按特定顺序将消息推送到具有不同 JMSXGroupId 的队列中:
- "A1" (JMSXGroupId: 1)
- "B1" (JMSXGroupId: 2)
- "A2" (JMSXGroupId: 1)
- "C1" (JMSXGroupId: 3)
- "B2" (JMSXGroupId: 2)
代码使 A1 回滚(它最初重试消息 3 次)并延迟返回到队列。然而,消费者然后等待直到它可以再次拿起 A1(在等待延迟的时间之后),这意味着 B1 和 C1 组被阻塞在 A1 后面并且永远不会被处理。
理想情况下,我希望的是当 A1 被放回队列并被告知等待时,消费者会选择 B1 和 C1...我最终想要做的是停止一个 JMSXGroup 阻塞其他人在消费者身上。此外,可能值得补充的是,我需要保持 A (A1、A2、A3...) 的消息序列顺序,并希望通过将它们留在队列中来做到这一点,而不是必须为异常构建一些管理解决方案。
onException(Exception.class)
.log("Exception Caught !! ")
.redeliveryDelay("1000")
.maximumRedeliveries(3)
.handled(false)
.markRollbackOnly()
.log("log:output");
from("amq:queue:mailbox?concurrentConsumers=1")
.to(logEndpoint)
.process(exchange -> {
if(exchange.getIn().getBody(String.class).contains("A")) {
throw new Exception("Found A");
}
});
我正在使用基于 Java 的 Apache Camel 微服务和事务路由。
没什么特别的,但如果需要,我可以提供更多 detail/configuration 详细信息。
提前致谢
队列上的严格排序必然会 运行 出现这样的问题,因为队列必须遵守其基本的先进先出(即 FIFO)语义。即使在 A1 失败后立即拾取 B1,您也必须等到消费 A2 直到 A1 被消费以保持顺序,并且由于队列必须被消费 FIFO,这将阻止消费 C1 和任何其他消息后面。
您正在使用 Camel 的重新传递,它只会重新传递调用处理器方法(例如失败的地方),而不是整个路由。
您可能想要查看使用 JMS 事务并让消息一直回滚到 JMS 代理,并在消息代理上配置重新传递设置。
如果您有 Camel in Action 一书的副本,那么我建议您阅读交易章节和错误处理章节。
您的问题是因为消费者端点上的 concurrentConsumers=1
。如果您只有一个消费者,不可能并行处理 JMS 组。
有了这个限制,你实际上有一个 Exclusive Consumer 而 JMS 组 header 没有效果 因为无论如何只有一个消费者处理消息。
JMSXGroupId
header 确保具有相同组 ID 的所有消息都由同一消费者处理。因此,如果您有 3 个消费者,即使消息 A1 "blocks" 一段时间内是 A 组的消费者,您示例中的 3 个组也可以并行处理。
但是,当您的消费者 少于组 时,一条消息当然会阻塞其他组,原因很简单,因为一个消费者处理多个组。
我正在尝试弄清楚当消息回滚到队列时是否可以影响消费者的消息处理顺序。 我在下面有一些简单的代码可以帮助我重现该问题。 我只是按特定顺序将消息推送到具有不同 JMSXGroupId 的队列中:
- "A1" (JMSXGroupId: 1)
- "B1" (JMSXGroupId: 2)
- "A2" (JMSXGroupId: 1)
- "C1" (JMSXGroupId: 3)
- "B2" (JMSXGroupId: 2)
代码使 A1 回滚(它最初重试消息 3 次)并延迟返回到队列。然而,消费者然后等待直到它可以再次拿起 A1(在等待延迟的时间之后),这意味着 B1 和 C1 组被阻塞在 A1 后面并且永远不会被处理。
理想情况下,我希望的是当 A1 被放回队列并被告知等待时,消费者会选择 B1 和 C1...我最终想要做的是停止一个 JMSXGroup 阻塞其他人在消费者身上。此外,可能值得补充的是,我需要保持 A (A1、A2、A3...) 的消息序列顺序,并希望通过将它们留在队列中来做到这一点,而不是必须为异常构建一些管理解决方案。
onException(Exception.class)
.log("Exception Caught !! ")
.redeliveryDelay("1000")
.maximumRedeliveries(3)
.handled(false)
.markRollbackOnly()
.log("log:output");
from("amq:queue:mailbox?concurrentConsumers=1")
.to(logEndpoint)
.process(exchange -> {
if(exchange.getIn().getBody(String.class).contains("A")) {
throw new Exception("Found A");
}
});
我正在使用基于 Java 的 Apache Camel 微服务和事务路由。 没什么特别的,但如果需要,我可以提供更多 detail/configuration 详细信息。
提前致谢
队列上的严格排序必然会 运行 出现这样的问题,因为队列必须遵守其基本的先进先出(即 FIFO)语义。即使在 A1 失败后立即拾取 B1,您也必须等到消费 A2 直到 A1 被消费以保持顺序,并且由于队列必须被消费 FIFO,这将阻止消费 C1 和任何其他消息后面。
您正在使用 Camel 的重新传递,它只会重新传递调用处理器方法(例如失败的地方),而不是整个路由。
您可能想要查看使用 JMS 事务并让消息一直回滚到 JMS 代理,并在消息代理上配置重新传递设置。
如果您有 Camel in Action 一书的副本,那么我建议您阅读交易章节和错误处理章节。
您的问题是因为消费者端点上的 concurrentConsumers=1
。如果您只有一个消费者,不可能并行处理 JMS 组。
有了这个限制,你实际上有一个 Exclusive Consumer 而 JMS 组 header 没有效果 因为无论如何只有一个消费者处理消息。
JMSXGroupId
header 确保具有相同组 ID 的所有消息都由同一消费者处理。因此,如果您有 3 个消费者,即使消息 A1 "blocks" 一段时间内是 A 组的消费者,您示例中的 3 个组也可以并行处理。
但是,当您的消费者 少于组 时,一条消息当然会阻塞其他组,原因很简单,因为一个消费者处理多个组。