RabbitMQ请求,总是超时
RabbitMQ request, always timesout
我有一个奇怪的问题,即我的回调从未发布并且消息超时,即使该方法在队列中运行也是如此。这发生在某些特定的队列中,并且发生一次后,我无法从客户端发出任何其他请求,这些请求甚至以前都有效,它们都超时。必须重新启动客户端并服务器才能使其再次工作。
这是代码,它发生的地方,我似乎无法理解哪里出了问题。
Server.js 我创建队列的文件。我有几个这样的队列,这是其中之一。
var amqp = require('amqp');
var util = require('util');
var cnn = amqp.createConnection({host:'127.0.0.1'});
var getCart = require('./services/getCart');
cnn.on('ready', function() {
cnn.queue('getCart_queue', function(q){
q.subscribe(function(message, headers, deliveryInfo, m){
// util.log(util.format( deliveryInfo.routingKey, message));
// util.log("Message: "+JSON.stringify(message));
// util.log("DeliveryInfo: "+JSON.stringify(deliveryInfo));
getCart.handle_request(message, function(err,res){
cnn.publish(m.replyTo, res, {
contentType:'application/json',
contentEncoding:'utf-8',
correlationId:m.correlationId
});
});
});
});
});
这里handle request函数成功完成,但是回调一直没有通过,一直在另一端超时
var cart = require('../models/cart');
function handle_request(msg, callback) {
var user_id = msg.id;
cart
.find({id:user_id})
.populate('users ads')
.exec(function(err, results){
// This works, just the callback doesnt
if(!err){
console.log(results);
callback(null, results);
} else {
console.log(err);
callback(err, null);
}
});
}
exports.handle_request = handle_request;
这就是我调用请求的方式
var msg_payload = {"id":id};
mq_client.make_request('getCart_queue', msg_payload, function(err, results){
console.log(results); // never prints
//stuff that is never reached
});
These are my rpc files,我认为这些应该没有什么问题,因为其他一些队列工作正常。
这是客户端显示的错误
GET /getCart - - ms - -
Error: timeout 6ee0bd2a4b2ba1d8286e068b0f674d8f
at Timeout.<anonymous> (E:\Ebay_client\rpc\amqprpc.js:32:18)
at Timeout.ontimeout [as _onTimeout] (timers.js:341:34)
at tryOnTimeout (timers.js:232:11)
at Timer.listOnTimeout (timers.js:202:5)
希望信息不模糊,如果您需要更多信息,请告诉我。谢谢!
我认为错误出在这个文件中,因为我尝试调试并且从 rabbitmq 服务器调用回调并且它具有相关 ID 以及对变量的回复,所以没有选择请求到这里。
var amqp = require('amqp')
, crypto = require('crypto');
var TIMEOUT=8000;
var CONTENT_TYPE='application/json';
var CONTENT_ENCODING='utf-8';
var self;
exports = module.exports = AmqpRpc;
function AmqpRpc(connection){
self = this;
this.connection = connection;
this.requests = {};
this.response_queue = false;
}
AmqpRpc.prototype.makeRequest = function(queue_name, content, callback){
self = this;
var correlationId = crypto.randomBytes(16).toString('hex');
var tId = setTimeout(function(corr_id){
callback(new Error("timeout " + corr_id));
delete self.requests[corr_id];
}, TIMEOUT, correlationId);
var entry = {
callback:callback,
timeout: tId
};
self.requests[correlationId]=entry;
self.setupResponseQueue(function(){
self.connection.publish(queue_name, content, {
correlationId:correlationId,
contentType:CONTENT_TYPE,
contentEncoding:CONTENT_ENCODING,
replyTo:self.response_queue});
});
};
AmqpRpc.prototype.setupResponseQueue = function(next){
if(this.response_queue) return next();
self = this;
self.connection.queue('', {exclusive:true}, function(q){
self.response_queue = q.name;
q.subscribe(function(message, headers, deliveryInfo, m){
var correlationId = m.correlationId;
if(correlationId in self.requests){
var entry = self.requests[correlationId];
clearTimeout(entry.timeout);
delete self.requests[correlationId];
entry.callback(null, message);
}
});
return next();
});
};
这是 client.js
文件中 make_request()
的代码:
var amqp = require('amqp');
var connection = amqp.createConnection({host:'127.0.0.1'});
var rpc = new (require('./amqprpc'))(connection);
function make_request(queue_name, msg_payload, callback){
rpc.makeRequest(queue_name, msg_payload, function(err, response){
if(err)
console.error(err);
else{
console.log("response", response);
callback(null, response);
}
});
}
exports.make_request = make_request;
看看当你在 rpc.makeRequest()
上出错时会发生什么:
rpc.makeRequest(queue_name, msg_payload, function(err, response){
if(err)
console.error(err);
//
//HERE: should be a callback call here.
//
else{
console.log("response", response);
callback(null, response);
}
});
这可能就是您超时的原因。希望对你有帮助。
rabbitMQ 没有问题,但我在处理请求中的查询和响应请求后出现了问题。
对于遇到此问题的其他人,检查并仔细检查每个语句,因为错误不会显示在控制台中,只会显示超时
我有一个奇怪的问题,即我的回调从未发布并且消息超时,即使该方法在队列中运行也是如此。这发生在某些特定的队列中,并且发生一次后,我无法从客户端发出任何其他请求,这些请求甚至以前都有效,它们都超时。必须重新启动客户端并服务器才能使其再次工作。 这是代码,它发生的地方,我似乎无法理解哪里出了问题。
Server.js 我创建队列的文件。我有几个这样的队列,这是其中之一。
var amqp = require('amqp');
var util = require('util');
var cnn = amqp.createConnection({host:'127.0.0.1'});
var getCart = require('./services/getCart');
cnn.on('ready', function() {
cnn.queue('getCart_queue', function(q){
q.subscribe(function(message, headers, deliveryInfo, m){
// util.log(util.format( deliveryInfo.routingKey, message));
// util.log("Message: "+JSON.stringify(message));
// util.log("DeliveryInfo: "+JSON.stringify(deliveryInfo));
getCart.handle_request(message, function(err,res){
cnn.publish(m.replyTo, res, {
contentType:'application/json',
contentEncoding:'utf-8',
correlationId:m.correlationId
});
});
});
});
});
这里handle request函数成功完成,但是回调一直没有通过,一直在另一端超时
var cart = require('../models/cart');
function handle_request(msg, callback) {
var user_id = msg.id;
cart
.find({id:user_id})
.populate('users ads')
.exec(function(err, results){
// This works, just the callback doesnt
if(!err){
console.log(results);
callback(null, results);
} else {
console.log(err);
callback(err, null);
}
});
}
exports.handle_request = handle_request;
这就是我调用请求的方式
var msg_payload = {"id":id};
mq_client.make_request('getCart_queue', msg_payload, function(err, results){
console.log(results); // never prints
//stuff that is never reached
});
These are my rpc files,我认为这些应该没有什么问题,因为其他一些队列工作正常。 这是客户端显示的错误
GET /getCart - - ms - -
Error: timeout 6ee0bd2a4b2ba1d8286e068b0f674d8f
at Timeout.<anonymous> (E:\Ebay_client\rpc\amqprpc.js:32:18)
at Timeout.ontimeout [as _onTimeout] (timers.js:341:34)
at tryOnTimeout (timers.js:232:11)
at Timer.listOnTimeout (timers.js:202:5)
希望信息不模糊,如果您需要更多信息,请告诉我。谢谢!
我认为错误出在这个文件中,因为我尝试调试并且从 rabbitmq 服务器调用回调并且它具有相关 ID 以及对变量的回复,所以没有选择请求到这里。
var amqp = require('amqp')
, crypto = require('crypto');
var TIMEOUT=8000;
var CONTENT_TYPE='application/json';
var CONTENT_ENCODING='utf-8';
var self;
exports = module.exports = AmqpRpc;
function AmqpRpc(connection){
self = this;
this.connection = connection;
this.requests = {};
this.response_queue = false;
}
AmqpRpc.prototype.makeRequest = function(queue_name, content, callback){
self = this;
var correlationId = crypto.randomBytes(16).toString('hex');
var tId = setTimeout(function(corr_id){
callback(new Error("timeout " + corr_id));
delete self.requests[corr_id];
}, TIMEOUT, correlationId);
var entry = {
callback:callback,
timeout: tId
};
self.requests[correlationId]=entry;
self.setupResponseQueue(function(){
self.connection.publish(queue_name, content, {
correlationId:correlationId,
contentType:CONTENT_TYPE,
contentEncoding:CONTENT_ENCODING,
replyTo:self.response_queue});
});
};
AmqpRpc.prototype.setupResponseQueue = function(next){
if(this.response_queue) return next();
self = this;
self.connection.queue('', {exclusive:true}, function(q){
self.response_queue = q.name;
q.subscribe(function(message, headers, deliveryInfo, m){
var correlationId = m.correlationId;
if(correlationId in self.requests){
var entry = self.requests[correlationId];
clearTimeout(entry.timeout);
delete self.requests[correlationId];
entry.callback(null, message);
}
});
return next();
});
};
这是 client.js
文件中 make_request()
的代码:
var amqp = require('amqp');
var connection = amqp.createConnection({host:'127.0.0.1'});
var rpc = new (require('./amqprpc'))(connection);
function make_request(queue_name, msg_payload, callback){
rpc.makeRequest(queue_name, msg_payload, function(err, response){
if(err)
console.error(err);
else{
console.log("response", response);
callback(null, response);
}
});
}
exports.make_request = make_request;
看看当你在 rpc.makeRequest()
上出错时会发生什么:
rpc.makeRequest(queue_name, msg_payload, function(err, response){
if(err)
console.error(err);
//
//HERE: should be a callback call here.
//
else{
console.log("response", response);
callback(null, response);
}
});
这可能就是您超时的原因。希望对你有帮助。
rabbitMQ 没有问题,但我在处理请求中的查询和响应请求后出现了问题。
对于遇到此问题的其他人,检查并仔细检查每个语句,因为错误不会显示在控制台中,只会显示超时