以编程方式配置 Apache ActiveMQ
Configure Apache ActiveMQ programmatically
我正在使用 Apache ActiveMQ 对大量消息进行排队,然后在一天结束时将它们从队列中取出。不过,我对 ActiveMQ 的运行方式感到困惑。在我的 PC 上,我没有将 ActiveMQ 作为服务安装,也没有在某处安装服务器。我刚刚将 "activemq-all-5.14.5.jar" 作为 Maven 依赖包含在我的项目中,目前我正在使用以下代码:
public static void main(String[] args) throws URISyntaxException, Exception {
BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:4848)"));
broker.start();
Connection connection = null;
try {
// Producer
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:4848");
connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("customerQueue");
String payload = "Important Task";
Message msg = session.createTextMessage(payload);
MessageProducer producer = session.createProducer(queue);
System.out.println("Sending text '" + payload + "'");
msg.setLongProperty("_AMQ_SCHED_DELIVERY", System.currentTimeMillis() + 5000L);
producer.send(msg);
// Consumer
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
QueueBrowser browser = session.createBrowser(queue);
while (browser.getEnumeration().hasMoreElements()) {
TextMessage textMsg = (TextMessage) consumer.receive();
browser.getEnumeration().nextElement();
System.out.println(textMsg);
System.out.println("Received: " + textMsg.getText());
}
session.close();
} finally {
if (connection != null) {
connection.close();
}
broker.stop();
}
}
如您所见,我想将消息延迟 5 秒(或更长时间,这可能会有所不同),但在我找到的每个指南中,我都被指示配置 XML 配置文件。但是,这是一个当您 运行 ActiveMQ 作为服务时使用的文件。我目前正在使用 jar 库。
最初,我安装了 Glassgfish 服务器,以便使用 JMS 对所有消息进行排队,但从那时起我就放弃了该项目,但 ActiveMQ (localhost:4848) 仍在使用 IP。
请注意,以下是一个完美的示例 - KahaDB 也用于在服务器出现故障时存储消息。
就我而言,ActiveMQ 确实从我正在 运行 宁此代码的 STS 启动本地服务器,但配置文件在哪里?我可以以编程方式更改其属性吗?
这应该有效(适用于我们的 ActiveMQ 5.12.3)。请务必先清理您的 KahaDB 存储,以避免读取以前的消息。
public static void main(String[] args) throws Exception {
BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:4848)"));
broker.setSchedulerSupport(true);
broker.start();
Connection connection = null;
try {
// Producer
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:4848");
connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("customerQueue");
String payload = "Important Task";
Message msg = session.createTextMessage(payload);
MessageProducer producer = session.createProducer(queue);
System.out.println("Sending text '" + payload + "'");
msg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 5000L);
producer.send(msg);
connection.start();
// Consumer
MessageConsumer consumer = null;
consumer = session.createConsumer(queue);
QueueBrowser browser = session.createBrowser(queue);
while (browser.getEnumeration().hasMoreElements()) {
TextMessage textMsg = (TextMessage) consumer.receive();
browser.getEnumeration().nextElement();
System.out.println(textMsg);
System.out.println("Received: " + textMsg.getText());
}
session.close();
} finally{
if (connection != null) {
connection.close();
}
broker.stop();
}
}
第一次清理 运行(KahaDB 存储为空)不应输出
"Received: Important Task"
,而第二个会,如果你不删除中间的数据文件。
删除行`
msg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 5000L);
将使第一个干净的 运行 输出 "Received: Important Task"
我正在使用 Apache ActiveMQ 对大量消息进行排队,然后在一天结束时将它们从队列中取出。不过,我对 ActiveMQ 的运行方式感到困惑。在我的 PC 上,我没有将 ActiveMQ 作为服务安装,也没有在某处安装服务器。我刚刚将 "activemq-all-5.14.5.jar" 作为 Maven 依赖包含在我的项目中,目前我正在使用以下代码:
public static void main(String[] args) throws URISyntaxException, Exception {
BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:4848)"));
broker.start();
Connection connection = null;
try {
// Producer
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:4848");
connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("customerQueue");
String payload = "Important Task";
Message msg = session.createTextMessage(payload);
MessageProducer producer = session.createProducer(queue);
System.out.println("Sending text '" + payload + "'");
msg.setLongProperty("_AMQ_SCHED_DELIVERY", System.currentTimeMillis() + 5000L);
producer.send(msg);
// Consumer
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
QueueBrowser browser = session.createBrowser(queue);
while (browser.getEnumeration().hasMoreElements()) {
TextMessage textMsg = (TextMessage) consumer.receive();
browser.getEnumeration().nextElement();
System.out.println(textMsg);
System.out.println("Received: " + textMsg.getText());
}
session.close();
} finally {
if (connection != null) {
connection.close();
}
broker.stop();
}
}
如您所见,我想将消息延迟 5 秒(或更长时间,这可能会有所不同),但在我找到的每个指南中,我都被指示配置 XML 配置文件。但是,这是一个当您 运行 ActiveMQ 作为服务时使用的文件。我目前正在使用 jar 库。
最初,我安装了 Glassgfish 服务器,以便使用 JMS 对所有消息进行排队,但从那时起我就放弃了该项目,但 ActiveMQ (localhost:4848) 仍在使用 IP。
请注意,以下是一个完美的示例 - KahaDB 也用于在服务器出现故障时存储消息。
就我而言,ActiveMQ 确实从我正在 运行 宁此代码的 STS 启动本地服务器,但配置文件在哪里?我可以以编程方式更改其属性吗?
这应该有效(适用于我们的 ActiveMQ 5.12.3)。请务必先清理您的 KahaDB 存储,以避免读取以前的消息。
public static void main(String[] args) throws Exception {
BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:4848)"));
broker.setSchedulerSupport(true);
broker.start();
Connection connection = null;
try {
// Producer
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:4848");
connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("customerQueue");
String payload = "Important Task";
Message msg = session.createTextMessage(payload);
MessageProducer producer = session.createProducer(queue);
System.out.println("Sending text '" + payload + "'");
msg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 5000L);
producer.send(msg);
connection.start();
// Consumer
MessageConsumer consumer = null;
consumer = session.createConsumer(queue);
QueueBrowser browser = session.createBrowser(queue);
while (browser.getEnumeration().hasMoreElements()) {
TextMessage textMsg = (TextMessage) consumer.receive();
browser.getEnumeration().nextElement();
System.out.println(textMsg);
System.out.println("Received: " + textMsg.getText());
}
session.close();
} finally{
if (connection != null) {
connection.close();
}
broker.stop();
}
}
第一次清理 运行(KahaDB 存储为空)不应输出
"Received: Important Task"
,而第二个会,如果你不删除中间的数据文件。
删除行`
msg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 5000L);
将使第一个干净的 运行 输出 "Received: Important Task"