从 ActiveMQ 队列读取的独立 java 程序在代理关闭后不会自动重新连接
Standalone java program reading from ActiveMQ queue won't auto reconnect after broker is down
ActiveMQ 设置、服务器、属性都在 jndi.properties 文件中。
示例:
java.naming.provider.url=failover:(tcp://localhost:61616?keepAlive=true)
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
queue.MyQueue = testUpdate
虽然我的程序看起来像这样:
public class MQReader{
public final String JNDI_FACTORY = "ConnectionFactory";
public final String QUEUE = "MyQueue";
private QueueConnectionFactory queueConnectionFactory;
private QueueConnection queueConnection;
private QueueSession queueSession;
private QueueReceiver queueReceiver;
private Queue queue;
public static void main(String[] args) throws Exception {
// create a new intial context, which loads from jndi.properties file
javax.naming.Context ctx = new javax.naming.InitialContext();
MQReader reader = new MQReader();
reader.init(ctx);
try {
reader.wait();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
reader.close();
}
public void init(Context context) throws NamingException, JMSException {
queueConnectionFactory = (QueueConnectionFactory) context.lookup(JNDI_FACTORY);
queueConnection = queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queue = (Queue) context.lookup(QUEUE);
queueReceiver = queueSession.createReceiver(queue);
queueReceiver.setMessageListener(
message ->{
try {
if(message != null) {
//do stuff like print message for testing.
}
} catch (JMSException jmse) {
System.err.println("An exception occurred: " + jmse.getMessage());
}
}
);
queueConnection.start();
}
public void close() throws JMSException {
queueReceiver.close();
queueSession.close();
queueConnection.close();
}
}
我认为 jndi 中的故障转移项应该负责我的重新连接,但事实并非如此。我 运行 代理和 运行 程序,它运行良好,但是一旦我停止代理,我的消费者程序就退出,退出代码为 1。"Process finished with exit code 1"
我不确定我在这里做错了什么。我在很多地方添加了 print out 语句,发现它在 reader.wait() 处退出,没有触发任何异常。
我明白了。
由于我的主线程在我启动侦听器线程后退出并且侦听器线程完美地 运行(非守护线程),因此它保持 JVM 运行。一旦侦听器失去连接,程序就会退出,因为没有非守护线程 运行 了。故障转移协议代码是 运行 作为守护线程,因此 JVM 退出程序而不让故障转移协议代码重新连接。
所以我所做的是添加这段代码,这不是最好的方法,但它适用于我正在尝试做的事情。
Scanner in = new Scanner(System.in);
while(true){
System.out.println("Please enter \"stop\" to stop the program.");
String command = in.nextLine();
if("stop".equalsIgnoreCase(command)){
reader.close();
System.exit(0);
}
}
而不是
try {
reader.wait();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
reader.close();
wait 方法正在崩溃,并且没有使主线程保持活动状态。这也是一种让我的程序保持 运行 而不发送消息到队列来停止它的方法。
ActiveMQ 设置、服务器、属性都在 jndi.properties 文件中。
示例:
java.naming.provider.url=failover:(tcp://localhost:61616?keepAlive=true)
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
queue.MyQueue = testUpdate
虽然我的程序看起来像这样:
public class MQReader{
public final String JNDI_FACTORY = "ConnectionFactory";
public final String QUEUE = "MyQueue";
private QueueConnectionFactory queueConnectionFactory;
private QueueConnection queueConnection;
private QueueSession queueSession;
private QueueReceiver queueReceiver;
private Queue queue;
public static void main(String[] args) throws Exception {
// create a new intial context, which loads from jndi.properties file
javax.naming.Context ctx = new javax.naming.InitialContext();
MQReader reader = new MQReader();
reader.init(ctx);
try {
reader.wait();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
reader.close();
}
public void init(Context context) throws NamingException, JMSException {
queueConnectionFactory = (QueueConnectionFactory) context.lookup(JNDI_FACTORY);
queueConnection = queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queue = (Queue) context.lookup(QUEUE);
queueReceiver = queueSession.createReceiver(queue);
queueReceiver.setMessageListener(
message ->{
try {
if(message != null) {
//do stuff like print message for testing.
}
} catch (JMSException jmse) {
System.err.println("An exception occurred: " + jmse.getMessage());
}
}
);
queueConnection.start();
}
public void close() throws JMSException {
queueReceiver.close();
queueSession.close();
queueConnection.close();
}
}
我认为 jndi 中的故障转移项应该负责我的重新连接,但事实并非如此。我 运行 代理和 运行 程序,它运行良好,但是一旦我停止代理,我的消费者程序就退出,退出代码为 1。"Process finished with exit code 1"
我不确定我在这里做错了什么。我在很多地方添加了 print out 语句,发现它在 reader.wait() 处退出,没有触发任何异常。
我明白了。 由于我的主线程在我启动侦听器线程后退出并且侦听器线程完美地 运行(非守护线程),因此它保持 JVM 运行。一旦侦听器失去连接,程序就会退出,因为没有非守护线程 运行 了。故障转移协议代码是 运行 作为守护线程,因此 JVM 退出程序而不让故障转移协议代码重新连接。
所以我所做的是添加这段代码,这不是最好的方法,但它适用于我正在尝试做的事情。
Scanner in = new Scanner(System.in);
while(true){
System.out.println("Please enter \"stop\" to stop the program.");
String command = in.nextLine();
if("stop".equalsIgnoreCase(command)){
reader.close();
System.exit(0);
}
}
而不是
try {
reader.wait();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
reader.close();
wait 方法正在崩溃,并且没有使主线程保持活动状态。这也是一种让我的程序保持 运行 而不发送消息到队列来停止它的方法。