JMS 连接在连接停止时将消息发送到队列
JMS Connection delivering messages sent to the queue while the connection was stopped
我遇到了 JMS Connection stop() 和 start() 的问题。一个简单的 java 程序说明了相同的是:
public class Test {
static Connection producerConn = null;
static BufferedWriter consumerLog = null;
static BufferedWriter producerLog = null;
public static final void main(String[] args) throws Exception {
ConnectionFactory cf = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)");
producerConn = cf.createConnection();
producerLog = new BufferedWriter(new FileWriter("produced.log"));
consumerLog = new BufferedWriter(new FileWriter("consumed.log"));
new Thread(new Runnable() {
public void run() {
try {
producerConn.start();
Session session = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("SampleQ1");
MessageProducer producer = session.createProducer(queue);
Random random = new Random();
byte[] messageBytes = new byte[1024];
for (int i = 0; i < 100; i++) {
random.nextBytes(messageBytes);
Message message = session.createObjectMessage(messageBytes);
producer.send(message);
Thread.currentThread().sleep(10);
producerLog.write(message.getJMSMessageID());
producerLog.newLine();
producerLog.flush();
}
System.out.println("Produced 100000 messages...");
producerLog.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
}).start();
System.out.println("Started producer...");
new Thread(new Runnable() {
public void run() {
int count = 0;
try {
producerConn.start();
Session session = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("SampleQ1");
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new Test().new MyListener());
}
catch (Exception e) {
e.printStackTrace();
}
}
}).start();
System.out.println("Started consumer...");
}
private class MyListener implements MessageListener{
private int count = 0;
public void onMessage(Message message) {
try {
message.acknowledge();
System.out.println("count is " +count++);
if(count == 5){
producerConn.stop();
System.out.println("Sleeping Now for 5 seconds. . ." +System.currentTimeMillis());
Thread.currentThread().sleep(5000);
producerConn.start();
}
System.out.println("Waking up . . ." +System.currentTimeMillis());
consumerLog.write(message.getJMSMessageID());
consumerLog.newLine();
consumerLog.flush();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
}
我的想法是模拟连接stop()和start()。因此,在调用 stop() 后的消费者线程中,我放置了 5 秒的休眠。然而,与此同时,生产者线程继续其将消息发送到队列的工作。
我希望测试只使用在消费者调用 stop() 之前以及消费者从睡眠中醒来后再次调用 start() 之后传递的消息。但是这里发生的事情是,当消费者醒来时,它会从服务器读取所有消息,甚至是那些在消费者消息接收停止时发送到队列的消息。
我是不是做错了什么?
没有任何问题,这是正确的行为。在异步消息传递中,生产者和消费者松散地解耦。生产者不关心消费者是否正在消费消息。它不断地将消息放入队列,而消费者可能已关闭,或停止消费消息或积极消费消息。
connection.stop() 方法对生产者没有影响。它只影响消费者,stop() 方法暂停从 JMS 提供者到消费者的消息传递,而 start() 方法 starts/resumes 消息传递。
希望对您有所帮助。
我遇到了 JMS Connection stop() 和 start() 的问题。一个简单的 java 程序说明了相同的是:
public class Test {
static Connection producerConn = null;
static BufferedWriter consumerLog = null;
static BufferedWriter producerLog = null;
public static final void main(String[] args) throws Exception {
ConnectionFactory cf = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)");
producerConn = cf.createConnection();
producerLog = new BufferedWriter(new FileWriter("produced.log"));
consumerLog = new BufferedWriter(new FileWriter("consumed.log"));
new Thread(new Runnable() {
public void run() {
try {
producerConn.start();
Session session = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("SampleQ1");
MessageProducer producer = session.createProducer(queue);
Random random = new Random();
byte[] messageBytes = new byte[1024];
for (int i = 0; i < 100; i++) {
random.nextBytes(messageBytes);
Message message = session.createObjectMessage(messageBytes);
producer.send(message);
Thread.currentThread().sleep(10);
producerLog.write(message.getJMSMessageID());
producerLog.newLine();
producerLog.flush();
}
System.out.println("Produced 100000 messages...");
producerLog.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
}).start();
System.out.println("Started producer...");
new Thread(new Runnable() {
public void run() {
int count = 0;
try {
producerConn.start();
Session session = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("SampleQ1");
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new Test().new MyListener());
}
catch (Exception e) {
e.printStackTrace();
}
}
}).start();
System.out.println("Started consumer...");
}
private class MyListener implements MessageListener{
private int count = 0;
public void onMessage(Message message) {
try {
message.acknowledge();
System.out.println("count is " +count++);
if(count == 5){
producerConn.stop();
System.out.println("Sleeping Now for 5 seconds. . ." +System.currentTimeMillis());
Thread.currentThread().sleep(5000);
producerConn.start();
}
System.out.println("Waking up . . ." +System.currentTimeMillis());
consumerLog.write(message.getJMSMessageID());
consumerLog.newLine();
consumerLog.flush();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
}
我的想法是模拟连接stop()和start()。因此,在调用 stop() 后的消费者线程中,我放置了 5 秒的休眠。然而,与此同时,生产者线程继续其将消息发送到队列的工作。
我希望测试只使用在消费者调用 stop() 之前以及消费者从睡眠中醒来后再次调用 start() 之后传递的消息。但是这里发生的事情是,当消费者醒来时,它会从服务器读取所有消息,甚至是那些在消费者消息接收停止时发送到队列的消息。
我是不是做错了什么?
没有任何问题,这是正确的行为。在异步消息传递中,生产者和消费者松散地解耦。生产者不关心消费者是否正在消费消息。它不断地将消息放入队列,而消费者可能已关闭,或停止消费消息或积极消费消息。
connection.stop() 方法对生产者没有影响。它只影响消费者,stop() 方法暂停从 JMS 提供者到消费者的消息传递,而 start() 方法 starts/resumes 消息传递。
希望对您有所帮助。