工作人员在消费时受阻 - RabbitMQ - Node.js
Worker blocked on consume - RabbitMQ - Node.js
我在 Node.js 中遇到 RabbitMQ 问题。
我正在尝试实现一个 Pub/Sub 连接器,其中每个用户都有自己的队列来轮询以获取消息。
当我通过 Postma 发布一条消息并且用户使用它时,我没有问题(我正确地收到消息),而如果用户试图使用队列中的消息(但没有消息存在),它会卡住直到新发布已发布,但我无法获取消息。
我想要做的是中止消费并稍后重试。
你能帮我吗?
get_queue 工作正常并获取用户的个人队列。
路线
app.post('/consume', (req, res) => {
project.get_queue(req, res)
.then(result1 => {return project.consume(req, res, result1.message);})
.then(result2 => {res.status(result2.status).json({ message: result2.message });})
.catch(err => res.status(err.status >= 100 && err.status < 600 ? err.status : 500).send({message: err.message}))
});
控制器
exports.consume = function(req, res, user) {
return new Promise((resolve,reject) => {
const queue = user.queueName;
amqpConn.createChannel(function(err, ch) {
if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' });
ch.on("error", function(err){
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
ch.assertQueue(queue, {durable: true},function(err, _ok){
if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' });
console.log("#Msg: ", _ok.messageCount)
if(_ok.messageCount === 0) {
reject({ status: 400, message: 'No consuming' });
}
else {
var json = [];
var topic = ch.consume(queue, function(msg, err) {
work(msg, function(ok) {
try {
if (ok) {
ch.ack(msg);
json.push(JSON.parse(msg.content.toString()));
resolve({ status: 200, message: json});
}
else {
ch.reject(msg, true);
resolve({ status: 200, message: json});
}
} catch (e) {
reject({ status: 404, message: "niente da consumare"});
closeOnErr(e);
}});
}, {noAck: false});
})
})
};
这就是我到目前为止所做的。
exports.consume = function(req, res, user) {
return new Promise((resolve,reject) => {
const queue = user.queueName;
amqpConn.createChannel(function(err, ch) {
if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' });;
ch.on("error", function(err){
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
var json = [];
iterate(ch, queue, json);
})
})
};
function iterate(ch, queue, json) {
ch.get(queue, {
noAck: false
}, function (err, msg) {
if (!msg) return resolve({
status: 200,
message: json
});
work(msg, function (ok) {
console.log("Errore?");
console.log("MSG consumed: ", msg.content.toString());
try {
if (ok) {
ch.ack(msg);
json.push(JSON.parse(msg.content.toString()));
return iterate(ch, queue, json);
}
ch.reject(msg, true);
return resolve({
status: 200,
message: json
});
} catch (e) {
reject({
status: 404,
message: "niente da consumare"
});
closeOnErr(e);
}
});
})}
将ch.get与迭代器一起使用:
exports.consume = function(req, res, user) {
return new Promise((resolve, reject) => {
const queue = user.queueName;
amqpConn.createChannel(function(err, ch) {
if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' });;
ch.on("error", function(err){
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
var json = [];
return iterate(ch, queue, json, resolve, reject);
});
});
};
function iterate(ch, queue, json, resolve, reject) {
ch.get(queue, {
noAck: false
}, function (err, msg) {
if (!msg) return resolve({
status: 200,
message: json
});
work(msg, function (ok) {
console.log("Errore?");
console.log("MSG consumed: ", msg.content.toString());
try {
if (ok) {
ch.ack(msg);
json.push(JSON.parse(msg.content.toString()));
return iterate(ch, queue, json, resolve, reject);
}
ch.reject(msg, true);
return resolve({
status: 200,
message: json
});
} catch (e) {
closeOnErr(e);
return reject({
status: 404,
message: "niente da consumare"
});
}
});
});
}
它将 运行 函数 iterate()
,如果 msg
是 false
,它将解析,否则它将处理消息,如果结果来自 [=14] =] 不ok,则reject并停止迭代,如果结果ok,则再次迭代;
我在 Node.js 中遇到 RabbitMQ 问题。 我正在尝试实现一个 Pub/Sub 连接器,其中每个用户都有自己的队列来轮询以获取消息。 当我通过 Postma 发布一条消息并且用户使用它时,我没有问题(我正确地收到消息),而如果用户试图使用队列中的消息(但没有消息存在),它会卡住直到新发布已发布,但我无法获取消息。 我想要做的是中止消费并稍后重试。 你能帮我吗?
get_queue 工作正常并获取用户的个人队列。
路线
app.post('/consume', (req, res) => {
project.get_queue(req, res)
.then(result1 => {return project.consume(req, res, result1.message);})
.then(result2 => {res.status(result2.status).json({ message: result2.message });})
.catch(err => res.status(err.status >= 100 && err.status < 600 ? err.status : 500).send({message: err.message}))
});
控制器
exports.consume = function(req, res, user) {
return new Promise((resolve,reject) => {
const queue = user.queueName;
amqpConn.createChannel(function(err, ch) {
if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' });
ch.on("error", function(err){
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
ch.assertQueue(queue, {durable: true},function(err, _ok){
if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' });
console.log("#Msg: ", _ok.messageCount)
if(_ok.messageCount === 0) {
reject({ status: 400, message: 'No consuming' });
}
else {
var json = [];
var topic = ch.consume(queue, function(msg, err) {
work(msg, function(ok) {
try {
if (ok) {
ch.ack(msg);
json.push(JSON.parse(msg.content.toString()));
resolve({ status: 200, message: json});
}
else {
ch.reject(msg, true);
resolve({ status: 200, message: json});
}
} catch (e) {
reject({ status: 404, message: "niente da consumare"});
closeOnErr(e);
}});
}, {noAck: false});
})
})
};
这就是我到目前为止所做的。
exports.consume = function(req, res, user) {
return new Promise((resolve,reject) => {
const queue = user.queueName;
amqpConn.createChannel(function(err, ch) {
if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' });;
ch.on("error", function(err){
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
var json = [];
iterate(ch, queue, json);
})
})
};
function iterate(ch, queue, json) {
ch.get(queue, {
noAck: false
}, function (err, msg) {
if (!msg) return resolve({
status: 200,
message: json
});
work(msg, function (ok) {
console.log("Errore?");
console.log("MSG consumed: ", msg.content.toString());
try {
if (ok) {
ch.ack(msg);
json.push(JSON.parse(msg.content.toString()));
return iterate(ch, queue, json);
}
ch.reject(msg, true);
return resolve({
status: 200,
message: json
});
} catch (e) {
reject({
status: 404,
message: "niente da consumare"
});
closeOnErr(e);
}
});
})}
将ch.get与迭代器一起使用:
exports.consume = function(req, res, user) {
return new Promise((resolve, reject) => {
const queue = user.queueName;
amqpConn.createChannel(function(err, ch) {
if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' });;
ch.on("error", function(err){
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
var json = [];
return iterate(ch, queue, json, resolve, reject);
});
});
};
function iterate(ch, queue, json, resolve, reject) {
ch.get(queue, {
noAck: false
}, function (err, msg) {
if (!msg) return resolve({
status: 200,
message: json
});
work(msg, function (ok) {
console.log("Errore?");
console.log("MSG consumed: ", msg.content.toString());
try {
if (ok) {
ch.ack(msg);
json.push(JSON.parse(msg.content.toString()));
return iterate(ch, queue, json, resolve, reject);
}
ch.reject(msg, true);
return resolve({
status: 200,
message: json
});
} catch (e) {
closeOnErr(e);
return reject({
status: 404,
message: "niente da consumare"
});
}
});
});
}
它将 运行 函数 iterate()
,如果 msg
是 false
,它将解析,否则它将处理消息,如果结果来自 [=14] =] 不ok,则reject并停止迭代,如果结果ok,则再次迭代;