连续接收来自 Activemq 的消息
Recieve Message from Active MQ continously
下面是我的代码,用于从 java
中的活动 mq 接收消息
public void main(){
try{ // Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("myurl");
// Create a Connection
connection = connectionFactory.createConnection();
connection.start();
// System.out.println(connection.toString());
//connection.setExceptionListener(this);
// Create a Session
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("myqueuename");
consumer = session.createConsumer(destination);
while (true)//never ending loop
{
message = consumer.receive(); //System would wait here to read the message from the queue
if (message instanceof TextMessage)
{
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println(text);
message.acknowledge();
session.commit();
}
}
}
finaly{
closeallconnection();
}
由于while条件为真,循环会一直运行和message.receive等待,直到有新消息。所以,这里的问题是我从 apache tomcat 中 运行ning 并且成功 运行ning,但是当我关闭 tomcat 时,这段代码是 运行即使我关闭 tomcat 也会落后。
如果我再次启动 tomcat,即使存在现有代码,也会有另一个服务 运行。
因此,我在这里需要的帮助是如何连续接收消息并等待新消息,以及当我关闭 tomcat 时如何关闭此 jms 侦听器。因为,我不知道队列中会有多少消息,也不知道我什么时候会收到消息。任何帮助和建议都会很棒。
我认为这不是最好的方法。它无法扩展。
你不想要一个listener/processor;你想要一堆。当消息到达队列时,侦听器将从池中签出,处理该消息,并在完成后返回池中。
这会扩展,因为多个侦听器可以处理繁重的负载。每个请求一条消息。
您展示的并不是真正的生产级代码。虽然执行接收循环不是一个坏主意,但您需要在幕后提供一些样板逻辑,例如将该代码放入可配置数量的线程中,处理异常和重新连接,并集成到您的应用程序生命周期中。
您通常需要一些 Web 应用程序框架,例如 spring 来帮助完成这些任务。
在 Spring 的最新版本中,您甚至可以使用注释来做一些事情,这使得事情变得非常简单。使用 spring 引导,您不需要繁重的配置,而是 属性 文件中的简单连接字符串。
@Component
public class MessageListener {
@JmsListener(destination = "my.queue")
public void receiveMessage(String msg) {
System.out.println("Received :" + msg);
}
}
我和你@GRK 有同样的问题,我不想使用无限循环来让消费者倾听。 This 是我发现似乎对我有用的最简单的解决方案。
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
try {
if (!(msg instanceof TextMessage))
throw new RuntimeException("no text message");
TextMessage tm = (TextMessage) msg;
System.out.println(tm.getText()); // print message
} catch (JMSException e) {
System.err.println("Error reading message");
}
}
});
通过使用 MessageListener 或扩展 JMS MessageListener 的 class,您可以处理任何传入的消息,并且仍然部署代码,而不是挂在无限循环中。
下面是我的代码,用于从 java
中的活动 mq 接收消息public void main(){
try{ // Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("myurl");
// Create a Connection
connection = connectionFactory.createConnection();
connection.start();
// System.out.println(connection.toString());
//connection.setExceptionListener(this);
// Create a Session
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("myqueuename");
consumer = session.createConsumer(destination);
while (true)//never ending loop
{
message = consumer.receive(); //System would wait here to read the message from the queue
if (message instanceof TextMessage)
{
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println(text);
message.acknowledge();
session.commit();
}
}
}
finaly{
closeallconnection();
}
由于while条件为真,循环会一直运行和message.receive等待,直到有新消息。所以,这里的问题是我从 apache tomcat 中 运行ning 并且成功 运行ning,但是当我关闭 tomcat 时,这段代码是 运行即使我关闭 tomcat 也会落后。 如果我再次启动 tomcat,即使存在现有代码,也会有另一个服务 运行。 因此,我在这里需要的帮助是如何连续接收消息并等待新消息,以及当我关闭 tomcat 时如何关闭此 jms 侦听器。因为,我不知道队列中会有多少消息,也不知道我什么时候会收到消息。任何帮助和建议都会很棒。
我认为这不是最好的方法。它无法扩展。
你不想要一个listener/processor;你想要一堆。当消息到达队列时,侦听器将从池中签出,处理该消息,并在完成后返回池中。
这会扩展,因为多个侦听器可以处理繁重的负载。每个请求一条消息。
您展示的并不是真正的生产级代码。虽然执行接收循环不是一个坏主意,但您需要在幕后提供一些样板逻辑,例如将该代码放入可配置数量的线程中,处理异常和重新连接,并集成到您的应用程序生命周期中。
您通常需要一些 Web 应用程序框架,例如 spring 来帮助完成这些任务。
在 Spring 的最新版本中,您甚至可以使用注释来做一些事情,这使得事情变得非常简单。使用 spring 引导,您不需要繁重的配置,而是 属性 文件中的简单连接字符串。
@Component
public class MessageListener {
@JmsListener(destination = "my.queue")
public void receiveMessage(String msg) {
System.out.println("Received :" + msg);
}
}
我和你@GRK 有同样的问题,我不想使用无限循环来让消费者倾听。 This 是我发现似乎对我有用的最简单的解决方案。
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
try {
if (!(msg instanceof TextMessage))
throw new RuntimeException("no text message");
TextMessage tm = (TextMessage) msg;
System.out.println(tm.getText()); // print message
} catch (JMSException e) {
System.err.println("Error reading message");
}
}
});
通过使用 MessageListener 或扩展 JMS MessageListener 的 class,您可以处理任何传入的消息,并且仍然部署代码,而不是挂在无限循环中。