保持 spring 上下文活动,直到使用 JMS 消息
Keep spring context alive until JMS messages are consumed
我有一个与 JMS
- Spring Boot
和 ActiveMQ
相关的非常标准的设置。它工作正常,直到我尝试进行简单的集成测试。经过一番调查后,我发现 Spring 上下文和嵌入式代理在第一个 JMS 消息被消费后都关闭了,无论在消费期间是否触发了另一个事件。我能够通过在测试设置中添加 useShutdownHook=false
连接选项来解决代理问题,即
spring.activemq.broker-url = vm://broker?async=false&broker.persistent=false&broker.useShutdownHook=false
我正在寻找的基本上是一种强制测试 "stay alive" 直到所有 JMS 消息都被消耗的方法(在本例中它们只有两个)。我了解整个设置的异步性质,但在测试期间,获取这些正在生成和使用的消息的所有结果仍然很有帮助。
下面是我的设置,虽然它相当简单。
@EnableJms
public class ActiveMqConfig {
@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setMessageConverter(messageConverter);
return jmsTemplate;
}
@Bean
public MessageConverter messageConverter() {
MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
messageConverter.setTargetType(MessageType.TEXT);
messageConverter.setTypeIdPropertyName("_type");
return messageConverter;
}
}
然后我有一个监听给定事件的消息驱动的 POJO:
@JmsListener(destination = "events")
public void applicationSubmitted(MyType event) {
// do some work with the event here
jmsTemplate.convertAndSend("commands", mymessage);
}
还有一个:
@JmsListener(destination = "commands")
public void onCommand(TextMessage textMessage) {
}
我尝试过的一件事是添加延迟,即在消息发送后 sleep(200)
。然而,这是非常不可靠的,并且还会减慢测试速度,因为执行时间可能少于 50 毫秒。下面是测试本身。除非取消注释等待,否则我永远不会到达第二个事件侦听器,因为应用程序上下文关闭,测试结束并且消息为 "forgotten".
@SpringBootTest
class MyEventIntegrationTest extends Specification {
@Autowired
JmsTemplate jmsTemplate
def "My event is successfully handled"() {
given:
def event = new MyEvent()
when:
jmsTemplate.convertAndSend("events", event)
// sleep(200)
then:
1 == 1
}
}
我认为您问题的根源在于异步事件处理。发送事件后,您的测试就结束了。这当然会导致 Spring 上下文和代理关闭。 JMS 侦听器在另一个线程中 运行。你必须想办法等待他们。否则,您的线程(即您的测试用例)刚刚完成。
我们在上一个项目中遇到了类似的问题,并编写了一个小实用程序,对我们有很大帮助。 JMS 提供 "browse" 队列并查看它是否为空的能力:
public final class JmsUtil {
private static final int MAX_TRIES = 5000;
private final JmsTemplate jmsTemplate;
public JmsUtil(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
private int getMessageCount(String queueName) {
return jmsTemplate.browseSelected(queueName, "true = true", (s, qb) -> Collections.list(qb.getEnumeration()).size());
}
public void waitForAll(String queueName) {
int i = 0;
while (i <= MAX_TRIES) {
if (getMessageCount(queueName) == 0) {
return;
}
i++;
}
}
使用此实用程序,您可以执行以下操作:
def "My event is successfully handled"() {
given:
def event = new MyEvent()
when:
jmsTemplate.convertAndSend("events", event)
jmsUtility.waitForAll("events"); // wait until the event has been consumed
jmsUtility.waitForAll("commands"); // wait until the command has been consumed
then:
1 == 1
}
注意:此实用程序假定您将 JMS 消息发送到队列。通过浏览队列,我们可以检查它是否为空。如果是某个主题,您可能需要进行另一次检查。所以请注意这一点!
好吧,这是测试基于异步消息交换的系统时的标准问题。通常,它会在您跳过的测试部分中解决 - then
部分。
事实是,在您的测试中,您通常希望系统做一些有用的事情,例如在数据库中进行更改,向另一个系统发送休息调用,在另一个队列中发送消息等。我们可以等待一段时间,直到它通过不断检查结果发生 - 如果结果在时间内实现 window 我们已经设置 - 然后我们可以假设测试已经通过。
该方法的伪代码如下:
for (i to MAX_RETRIES; i++) {
checkThatTheChangesInDBHasBeenMade();
checkThatTheRestCallHasBeenMade();
checkThatTheMessageIsPostedInAnotherQueue();
Thread.sleep(50ms);
}
这种方式在最佳情况下您的测试将在 50 毫秒内通过。在最坏的情况下,它会失败,这将花费 MAX_RETRIES * 50ms 的时间来执行测试。
此外,我应该提到有一个名为 awaitility 的好工具,它提供了很好的 API(顺便说一句,它支持 groovy DSL)来处理此类问题异步世界:
await().atMost(5, SECONDS).until(customerStatusIsUpdated());
我有一个与 JMS
- Spring Boot
和 ActiveMQ
相关的非常标准的设置。它工作正常,直到我尝试进行简单的集成测试。经过一番调查后,我发现 Spring 上下文和嵌入式代理在第一个 JMS 消息被消费后都关闭了,无论在消费期间是否触发了另一个事件。我能够通过在测试设置中添加 useShutdownHook=false
连接选项来解决代理问题,即
spring.activemq.broker-url = vm://broker?async=false&broker.persistent=false&broker.useShutdownHook=false
我正在寻找的基本上是一种强制测试 "stay alive" 直到所有 JMS 消息都被消耗的方法(在本例中它们只有两个)。我了解整个设置的异步性质,但在测试期间,获取这些正在生成和使用的消息的所有结果仍然很有帮助。
下面是我的设置,虽然它相当简单。
@EnableJms
public class ActiveMqConfig {
@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setMessageConverter(messageConverter);
return jmsTemplate;
}
@Bean
public MessageConverter messageConverter() {
MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
messageConverter.setTargetType(MessageType.TEXT);
messageConverter.setTypeIdPropertyName("_type");
return messageConverter;
}
}
然后我有一个监听给定事件的消息驱动的 POJO:
@JmsListener(destination = "events")
public void applicationSubmitted(MyType event) {
// do some work with the event here
jmsTemplate.convertAndSend("commands", mymessage);
}
还有一个:
@JmsListener(destination = "commands")
public void onCommand(TextMessage textMessage) {
}
我尝试过的一件事是添加延迟,即在消息发送后 sleep(200)
。然而,这是非常不可靠的,并且还会减慢测试速度,因为执行时间可能少于 50 毫秒。下面是测试本身。除非取消注释等待,否则我永远不会到达第二个事件侦听器,因为应用程序上下文关闭,测试结束并且消息为 "forgotten".
@SpringBootTest
class MyEventIntegrationTest extends Specification {
@Autowired
JmsTemplate jmsTemplate
def "My event is successfully handled"() {
given:
def event = new MyEvent()
when:
jmsTemplate.convertAndSend("events", event)
// sleep(200)
then:
1 == 1
}
}
我认为您问题的根源在于异步事件处理。发送事件后,您的测试就结束了。这当然会导致 Spring 上下文和代理关闭。 JMS 侦听器在另一个线程中 运行。你必须想办法等待他们。否则,您的线程(即您的测试用例)刚刚完成。
我们在上一个项目中遇到了类似的问题,并编写了一个小实用程序,对我们有很大帮助。 JMS 提供 "browse" 队列并查看它是否为空的能力:
public final class JmsUtil {
private static final int MAX_TRIES = 5000;
private final JmsTemplate jmsTemplate;
public JmsUtil(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
private int getMessageCount(String queueName) {
return jmsTemplate.browseSelected(queueName, "true = true", (s, qb) -> Collections.list(qb.getEnumeration()).size());
}
public void waitForAll(String queueName) {
int i = 0;
while (i <= MAX_TRIES) {
if (getMessageCount(queueName) == 0) {
return;
}
i++;
}
}
使用此实用程序,您可以执行以下操作:
def "My event is successfully handled"() {
given:
def event = new MyEvent()
when:
jmsTemplate.convertAndSend("events", event)
jmsUtility.waitForAll("events"); // wait until the event has been consumed
jmsUtility.waitForAll("commands"); // wait until the command has been consumed
then:
1 == 1
}
注意:此实用程序假定您将 JMS 消息发送到队列。通过浏览队列,我们可以检查它是否为空。如果是某个主题,您可能需要进行另一次检查。所以请注意这一点!
好吧,这是测试基于异步消息交换的系统时的标准问题。通常,它会在您跳过的测试部分中解决 - then
部分。
事实是,在您的测试中,您通常希望系统做一些有用的事情,例如在数据库中进行更改,向另一个系统发送休息调用,在另一个队列中发送消息等。我们可以等待一段时间,直到它通过不断检查结果发生 - 如果结果在时间内实现 window 我们已经设置 - 然后我们可以假设测试已经通过。
该方法的伪代码如下:
for (i to MAX_RETRIES; i++) {
checkThatTheChangesInDBHasBeenMade();
checkThatTheRestCallHasBeenMade();
checkThatTheMessageIsPostedInAnotherQueue();
Thread.sleep(50ms);
}
这种方式在最佳情况下您的测试将在 50 毫秒内通过。在最坏的情况下,它会失败,这将花费 MAX_RETRIES * 50ms 的时间来执行测试。
此外,我应该提到有一个名为 awaitility 的好工具,它提供了很好的 API(顺便说一句,它支持 groovy DSL)来处理此类问题异步世界:
await().atMost(5, SECONDS).until(customerStatusIsUpdated());