Node Js / Typescript - AMQP 消费者
Node Js / Typescript - AMQP Consumer
我正在尝试节点。js/typescript 第一次,在为兔子队列制作消费者时遇到了一些麻烦。
代码:
let amqp = require('amqp');
let connection = amqp.createConnection({url: "amqp://" + RABBITMQ_USER + ":" + RABBITMQ_PASSWORD + "@" + RABBITMQ_HOST + ":" + RABBITMQ_PORT + RABBITMQ_VHOST});
connection.on('ready', function() {
connection.exchange(RABBITMQ_WORKER_EXCHANGE, function (exchange) {
connection.queue(RABBITMQ_QUEUE, function (queue) {
queue.bind(exchange, function() {
queue.publish(function (message) {
console.log('subscribed to queue');
let encoded_payload = unescape(message.data);
let payload = JSON.parse(encoded_payload);
console.log('Received a message:');
console.log(payload);
})
})
})
})
})
它似乎连接到 amqp 服务器并且没有抛出任何错误,但它只是坐在那里并且不消耗任何东西。有没有我遗漏的步骤?
如有任何帮助,我们将不胜感激,
谢谢。
这是我的解决方案,它基于 amqp 的 JS 教程。
https://www.rabbitmq.com/tutorials/tutorial-three-javascript.html
可能不符合 TypeScript 标准,如果有更好的方法,请随时指正。
#!/usr/bin/env node
require('dotenv').config();
import amqp = require('amqplib/callback_api');
import db = require('./database');
amqp.connect({
protocol: process.env.RABBITMQ_PROTOCOL,
hostname: process.env.RABBITMQ_HOST,
port: process.env.RABBITMQ_PORT,
username: process.env.RABBITMQ_USER,
password: process.env.RABBITMQ_PASSWORD,
vhost: process.env.RABBITMQ_VHOST
}, function(err, conn) {
conn.createChannel(function (err, ch) {
// set exchange that is being used
ch.assertExchange(process.env.RABBITMQ_WORKER_EXCHANGE, 'direct', {durable: true});
// set queue that is being used
ch.assertQueue(process.env.RABBITMQ_QUEUE, {durable: true}, function (err, q) {
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);
// bind the queue to the exchange
ch.bindQueue(q.queue, process.env.RABBITMQ_WORKER_EXCHANGE, '');
// consume from the queue, one message at a time.
ch.consume(q.queue, function (msg) {
console.log("Message received: %s", msg.content.toString());
//save message to db
db.store(msg.content.toString()).then(function() {
//acknowledge receipt of message to amqp
console.log("Acknowledging message");
ch.ack(msg, true);
});
}, {noAck: false});
});
});
});
import * as Amqp from "amqp-ts";
var connection = new Amqp.Connection("amqp://localhost");
var exchange = connection.declareExchange("ExchangeName");
var queue = connection.declareQueue("QueueName");
queue.bind(exchange);
queue.activateConsumer((message) => {
console.log("Message received: " + message.getContent());
});
// it is possible that the following message is not received because
// it can be sent before the queue, binding or consumer exist
var msg = new Amqp.Message("Test");
exchange.send(msg);
connection.completeConfiguration().then(() => {
// the following message will be received because
// everything you defined earlier for this connection now exists
var msg2 = new Amqp.Message("Test2");
exchange.send(msg2);
});
我正在尝试节点。js/typescript 第一次,在为兔子队列制作消费者时遇到了一些麻烦。
代码:
let amqp = require('amqp');
let connection = amqp.createConnection({url: "amqp://" + RABBITMQ_USER + ":" + RABBITMQ_PASSWORD + "@" + RABBITMQ_HOST + ":" + RABBITMQ_PORT + RABBITMQ_VHOST});
connection.on('ready', function() {
connection.exchange(RABBITMQ_WORKER_EXCHANGE, function (exchange) {
connection.queue(RABBITMQ_QUEUE, function (queue) {
queue.bind(exchange, function() {
queue.publish(function (message) {
console.log('subscribed to queue');
let encoded_payload = unescape(message.data);
let payload = JSON.parse(encoded_payload);
console.log('Received a message:');
console.log(payload);
})
})
})
})
})
它似乎连接到 amqp 服务器并且没有抛出任何错误,但它只是坐在那里并且不消耗任何东西。有没有我遗漏的步骤?
如有任何帮助,我们将不胜感激, 谢谢。
这是我的解决方案,它基于 amqp 的 JS 教程。 https://www.rabbitmq.com/tutorials/tutorial-three-javascript.html
可能不符合 TypeScript 标准,如果有更好的方法,请随时指正。
#!/usr/bin/env node
require('dotenv').config();
import amqp = require('amqplib/callback_api');
import db = require('./database');
amqp.connect({
protocol: process.env.RABBITMQ_PROTOCOL,
hostname: process.env.RABBITMQ_HOST,
port: process.env.RABBITMQ_PORT,
username: process.env.RABBITMQ_USER,
password: process.env.RABBITMQ_PASSWORD,
vhost: process.env.RABBITMQ_VHOST
}, function(err, conn) {
conn.createChannel(function (err, ch) {
// set exchange that is being used
ch.assertExchange(process.env.RABBITMQ_WORKER_EXCHANGE, 'direct', {durable: true});
// set queue that is being used
ch.assertQueue(process.env.RABBITMQ_QUEUE, {durable: true}, function (err, q) {
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);
// bind the queue to the exchange
ch.bindQueue(q.queue, process.env.RABBITMQ_WORKER_EXCHANGE, '');
// consume from the queue, one message at a time.
ch.consume(q.queue, function (msg) {
console.log("Message received: %s", msg.content.toString());
//save message to db
db.store(msg.content.toString()).then(function() {
//acknowledge receipt of message to amqp
console.log("Acknowledging message");
ch.ack(msg, true);
});
}, {noAck: false});
});
});
});
import * as Amqp from "amqp-ts";
var connection = new Amqp.Connection("amqp://localhost");
var exchange = connection.declareExchange("ExchangeName");
var queue = connection.declareQueue("QueueName");
queue.bind(exchange);
queue.activateConsumer((message) => {
console.log("Message received: " + message.getContent());
});
// it is possible that the following message is not received because
// it can be sent before the queue, binding or consumer exist
var msg = new Amqp.Message("Test");
exchange.send(msg);
connection.completeConfiguration().then(() => {
// the following message will be received because
// everything you defined earlier for this connection now exists
var msg2 = new Amqp.Message("Test2");
exchange.send(msg2);
});