如何从 Web 应用程序收听消息队列? (Tomcat, ActiveMQ)
How to listen to a message queue from a web application? (Tomcat, ActiveMQ)
我很高兴改进我的 Web 应用程序,它运行在 Apache Tomcat 上。添加了一个 ActiveMQ JMS 服务器来发送和接收消息。
我已经可以发送和接收消息,但需要接收方的帮助。
我的网络应用程序应该如何连续监听一个队列来接收消息?
新消息到达,服务器应对其进行操作。例如:将数据添加到数据库或发回消息。
我已经可以发消息了。这是代码。
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("clientQueue");
MessageProducer publisher = session.createProducer(queue);
connection.start();
Message message = null;
message = session.createTextMessage("Text Message");
publisher.send(message);
请求后我已经可以收到消息(点击 ;-))
connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("serverQueue");
consumer = session.createConsumer(destination);
while (true) {
Message message = consumer.receive(300000);
//Do message stuff
}
如何让web应用持续监听队列?
建议的方法是什么?
热烈感谢所有帮助。谢谢
编辑 - 解决方案
当前的工作解决方案与 DaveH
的建议
我已经添加了一个 ServletContextListener 来持续收听我的消息。
web.xml
<listener>
<listener-class>com.test.JMSContextListener</listener-class>
</listener>
听众:
public class JMSContextListener implements ServletContextListener {
@Override
public void contextInitialized(ServletContextEvent arg0) {
Thread thread = new Thread(new JMSConnector());
thread.start();
}
@Override
public void contextDestroyed(ServletContextEvent arg0) {
//Nothing
}
}
连接:
public class JMSConnector implements Runnable {
public void run() {
try {
Context context = new InitialContext();
QueueConnectionFactory factory = (QueueConnectionFactory) context.lookup("java:comp/env/jms/ConnectionFactory");
Connection connection = factory.createConnection();
Queue queue = (javax.jms.Queue) context.lookup("java:comp/env/jms/serverQueue");
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
//This MessageListener will do stuff with the message
MessageListenerImpl messageListener = new MessageListenerImpl();
consumer.setMessageListener(messageListener);
connection.start();
// Start connection or nothing will happen!!!
connection.start();
} catch (JMSException ex) {
//TODO
} catch (NamingException ex) {
//TODO
}
}
}
这是一个建议的方法还是应该改进?
热烈感谢所有帮助。谢谢
我正在使用 spring 来收听队列。
定义监听器是这样的:
<jms:listener-container connection-factory="jmsConnectionFactoryLocal">
<jms:listener destination="QUEUE_NAME" ref="channelManagerSimulatorDefault"/>
</jms:listener-container>
必须根据您的 MQ 创建 jmsConnectionFactoryLocal。在我的例子中是 IBM WebsphereMQ,所以 jmsConnectionFactoryLocal 定义是这样的:
<bean id="mqConnectionFactoryLocal" class="com.ibm.mq.jms.MQQueueConnectionFactory">
<property name="hostName">
<value>THE_MQ_SERVER_IP</value>
</property>
<property name="port">
<value>MQ_PORT</value>
</property>
<property name="queueManager">
<value>QUEUE_MANAGER_NAME</value>
</property>
<property name="transportType">
<value>1</value>
</property>
</bean>
<bean id="jmsConnectionFactoryLocal" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
<property name="targetConnectionFactory" ref="mqConnectionFactoryLocal"/>
<property name="username" value="USER_NAME"/>
<property name="password" value="PASSWORD"/>
</bean>
您必须为 ActiveMQ 找到正确的 ConnectionFactory 实现并使用它。侦听器和 jmsConnectionFactory 相同且独立于 MQ 提供程序。
我在我的 webApplication spring-mvc 和 JMS 模板中使用 activeMQ,方法如下。
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
</bean>
<bean id="messageSender" class="xyz.MessageSender"/>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="destination" />
</bean>
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="REQUEST_QUEUE" />
</bean>
<bean id="messageListener" class="xtz.MessageListener">
<property name="listenerId" value="1" />
</bean>
<jms:listener-container connection-factory="connectionFactory">
<jms:listener destination="RESPONSE_QUEUE" ref="messageListener" method="messageReceived" />
</jms:listener-container>
Sender 和 Listener class 实现如下。
public class MessageSender
{
@Autowired
private JmsTemplate jmsTemplate;
public void sendMessage()
{
jmsTemplate.send(new MessageCreator()
{
public Message createMessage(Session session) throws JMSException
{
MapMessage message = session.createMapMessage();
message.setString("messageType", XXX);
message.setString("jsonMessage", XXXX);
return message;
}
});
}
}
public class MessageListener
{
private int listenerId;
@Override
public void messageReceived(Map<String, Object> message) throws Exception
{
//put your logic here
}
public int getListenerId()
{
return listenerId;
}
public void setListenerId(int listenerId)
{
this.listenerId = listenerId;
}
}
如果您的代码已经可以使用队列中的消息(看起来您这样做了),那么我认为您的问题归结为如何将那段代码获取到 运行。
看来您没有使用任何框架,所以我认为我将采取的方法是采用可以从队列中检索消息的代码,并 运行 它在应用程序的单独线程中服务器。让该线程在应用程序服务器启动时启动,并在应用程序服务器关闭时自行整理。
在应用服务器启动时启动线程的最简单方法是引入一个 ServletContextListener(示例 here。)在上下文监听器中,在单独的线程中启动队列监听代码。
编辑:我使用了这个建议的解决方案并将上面的代码添加到问题中。
在 Tomcat 中配置 JMS 队列:
https://martinsdeveloperworld.wordpress.com/2013/03/03/apache-activemq-and-tomcat/
将activemq-all-5.xx.jar放入$TOMCAT_HOME/lib
在Thread中循环监听,开始于@WebListener
:
contextDestroyed时停止
@WebListener
@Slf4j
public class JmsMailListener implements ServletContextListener {
private Thread listenerThread = null;
private QueueConnection connection;
@Override
public void contextInitialized(ServletContextEvent sce) {
try {
InitialContext initCtx = new InitialContext();
ActiveMQConnectionFactory connectionFactory =
(ActiveMQConnectionFactory) initCtx.lookup("java:comp/env/jms/ConnectionFactory");
connectionFactory.setTrustAllPackages(true);
connection = connectionFactory.createQueueConnection();
QueueSession queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = (Queue) initCtx.lookup("java:comp/env/jms/queue/MailQueue");
QueueReceiver receiver = queueSession.createReceiver(queue);
connection.start();
log.info("Listen JMS messages ...");
listenerThread = new Thread(() -> {
try {
while (!Thread.interrupted()) {
Message m = receiver.receive();
if (m instanceof ObjectMessage) {
ObjectMessage om = (ObjectMessage) m;
MyObject myObject = (MyObject) om.getObject();
log.info("Received MyObject {}", myObject);
...
}
}
} catch (Exception e) {
log.error("Receiving messages failed: " + e.getMessage(), e);
}
});
listenerThread.start();
} catch (Exception e) {
log.error("JMS failed: " + e.getMessage(), e);
}
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {
log.warn("Couldn't close JMSConnection: ", ex);
}
}
if (listenerThread != null) {
listenerThread.interrupt();
}
}
}
我很高兴改进我的 Web 应用程序,它运行在 Apache Tomcat 上。添加了一个 ActiveMQ JMS 服务器来发送和接收消息。
我已经可以发送和接收消息,但需要接收方的帮助。
我的网络应用程序应该如何连续监听一个队列来接收消息?
新消息到达,服务器应对其进行操作。例如:将数据添加到数据库或发回消息。
我已经可以发消息了。这是代码。
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("clientQueue");
MessageProducer publisher = session.createProducer(queue);
connection.start();
Message message = null;
message = session.createTextMessage("Text Message");
publisher.send(message);
请求后我已经可以收到消息(点击 ;-))
connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("serverQueue");
consumer = session.createConsumer(destination);
while (true) {
Message message = consumer.receive(300000);
//Do message stuff
}
如何让web应用持续监听队列? 建议的方法是什么?
热烈感谢所有帮助。谢谢
编辑 - 解决方案
当前的工作解决方案与 DaveH
的建议我已经添加了一个 ServletContextListener 来持续收听我的消息。
web.xml
<listener>
<listener-class>com.test.JMSContextListener</listener-class>
</listener>
听众:
public class JMSContextListener implements ServletContextListener {
@Override
public void contextInitialized(ServletContextEvent arg0) {
Thread thread = new Thread(new JMSConnector());
thread.start();
}
@Override
public void contextDestroyed(ServletContextEvent arg0) {
//Nothing
}
}
连接:
public class JMSConnector implements Runnable {
public void run() {
try {
Context context = new InitialContext();
QueueConnectionFactory factory = (QueueConnectionFactory) context.lookup("java:comp/env/jms/ConnectionFactory");
Connection connection = factory.createConnection();
Queue queue = (javax.jms.Queue) context.lookup("java:comp/env/jms/serverQueue");
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
//This MessageListener will do stuff with the message
MessageListenerImpl messageListener = new MessageListenerImpl();
consumer.setMessageListener(messageListener);
connection.start();
// Start connection or nothing will happen!!!
connection.start();
} catch (JMSException ex) {
//TODO
} catch (NamingException ex) {
//TODO
}
}
}
这是一个建议的方法还是应该改进?
热烈感谢所有帮助。谢谢
我正在使用 spring 来收听队列。 定义监听器是这样的:
<jms:listener-container connection-factory="jmsConnectionFactoryLocal">
<jms:listener destination="QUEUE_NAME" ref="channelManagerSimulatorDefault"/>
</jms:listener-container>
必须根据您的 MQ 创建 jmsConnectionFactoryLocal。在我的例子中是 IBM WebsphereMQ,所以 jmsConnectionFactoryLocal 定义是这样的:
<bean id="mqConnectionFactoryLocal" class="com.ibm.mq.jms.MQQueueConnectionFactory">
<property name="hostName">
<value>THE_MQ_SERVER_IP</value>
</property>
<property name="port">
<value>MQ_PORT</value>
</property>
<property name="queueManager">
<value>QUEUE_MANAGER_NAME</value>
</property>
<property name="transportType">
<value>1</value>
</property>
</bean>
<bean id="jmsConnectionFactoryLocal" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
<property name="targetConnectionFactory" ref="mqConnectionFactoryLocal"/>
<property name="username" value="USER_NAME"/>
<property name="password" value="PASSWORD"/>
</bean>
您必须为 ActiveMQ 找到正确的 ConnectionFactory 实现并使用它。侦听器和 jmsConnectionFactory 相同且独立于 MQ 提供程序。
我在我的 webApplication spring-mvc 和 JMS 模板中使用 activeMQ,方法如下。
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
</bean>
<bean id="messageSender" class="xyz.MessageSender"/>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="destination" />
</bean>
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="REQUEST_QUEUE" />
</bean>
<bean id="messageListener" class="xtz.MessageListener">
<property name="listenerId" value="1" />
</bean>
<jms:listener-container connection-factory="connectionFactory">
<jms:listener destination="RESPONSE_QUEUE" ref="messageListener" method="messageReceived" />
</jms:listener-container>
Sender 和 Listener class 实现如下。
public class MessageSender
{
@Autowired
private JmsTemplate jmsTemplate;
public void sendMessage()
{
jmsTemplate.send(new MessageCreator()
{
public Message createMessage(Session session) throws JMSException
{
MapMessage message = session.createMapMessage();
message.setString("messageType", XXX);
message.setString("jsonMessage", XXXX);
return message;
}
});
}
}
public class MessageListener
{
private int listenerId;
@Override
public void messageReceived(Map<String, Object> message) throws Exception
{
//put your logic here
}
public int getListenerId()
{
return listenerId;
}
public void setListenerId(int listenerId)
{
this.listenerId = listenerId;
}
}
如果您的代码已经可以使用队列中的消息(看起来您这样做了),那么我认为您的问题归结为如何将那段代码获取到 运行。
看来您没有使用任何框架,所以我认为我将采取的方法是采用可以从队列中检索消息的代码,并 运行 它在应用程序的单独线程中服务器。让该线程在应用程序服务器启动时启动,并在应用程序服务器关闭时自行整理。
在应用服务器启动时启动线程的最简单方法是引入一个 ServletContextListener(示例 here。)在上下文监听器中,在单独的线程中启动队列监听代码。
编辑:我使用了这个建议的解决方案并将上面的代码添加到问题中。
在 Tomcat 中配置 JMS 队列: https://martinsdeveloperworld.wordpress.com/2013/03/03/apache-activemq-and-tomcat/
将activemq-all-5.xx.jar放入$TOMCAT_HOME/lib
在Thread中循环监听,开始于
@WebListener
:contextDestroyed时停止
@WebListener @Slf4j public class JmsMailListener implements ServletContextListener { private Thread listenerThread = null; private QueueConnection connection; @Override public void contextInitialized(ServletContextEvent sce) { try { InitialContext initCtx = new InitialContext(); ActiveMQConnectionFactory connectionFactory = (ActiveMQConnectionFactory) initCtx.lookup("java:comp/env/jms/ConnectionFactory"); connectionFactory.setTrustAllPackages(true); connection = connectionFactory.createQueueConnection(); QueueSession queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = (Queue) initCtx.lookup("java:comp/env/jms/queue/MailQueue"); QueueReceiver receiver = queueSession.createReceiver(queue); connection.start(); log.info("Listen JMS messages ..."); listenerThread = new Thread(() -> { try { while (!Thread.interrupted()) { Message m = receiver.receive(); if (m instanceof ObjectMessage) { ObjectMessage om = (ObjectMessage) m; MyObject myObject = (MyObject) om.getObject(); log.info("Received MyObject {}", myObject); ... } } } catch (Exception e) { log.error("Receiving messages failed: " + e.getMessage(), e); } }); listenerThread.start(); } catch (Exception e) { log.error("JMS failed: " + e.getMessage(), e); } } @Override public void contextDestroyed(ServletContextEvent sce) { if (connection != null) { try { connection.close(); } catch (JMSException ex) { log.warn("Couldn't close JMSConnection: ", ex); } } if (listenerThread != null) { listenerThread.interrupt(); } } }