使用 Node JS 库的 Azure 服务总线队列的最大吞吐量?
Max throughput for Azure Service Bus Queue with Node JS library?
使用 Azure NodeJS 库,我无法以每秒 5 或 6 条消息的速度从 Azure 服务总线队列接收消息。这比官方文档建议的要慢几个数量级。我正在使用关闭分区的队列(如此处推荐 ),在 ReceiveAndDelete 模式下读取。本质上,我只是反复调用 .receiveQueueMessage()。
基于这个问题 (Azure Service Bus Scalability) 似乎其他人在使用 NodeJS 时也每秒看到 5/6 条消息。这几乎是一个硬性限制吗?任何已知的解决方法或优化?
你提供的线程中的代码很棒,它似乎在管道工作流中从Service Bus Queue接收消息,接收完上一条消息后将接收下一条消息。
并且如official site场景阶段高吞吐量队列部分所述,我们可以找到以下适合您情况的要点:
To increase the overall receive rate from the queue, use multiple
message factories to create receivers.
Use asynchronous operations to take advantage of client-side
batching.
Set the batching interval to 50ms to reduce the number of Service Bus
client protocol transmissions. If multiple senders are used, increase
the batching interval to 100ms.
为了实现这些优化,我们可以利用 sample on Azure github repo.
并最大化单个队列的吞吐量。我有一个简单的测试利用上面的示例代码并将循环时间设置为 10ms。
我测试的结果是:在ReceiveAndDelete模式下每秒会收到近50条消息;在 PeekLock 模式下,它每秒会收到大约 70 条消息,如 https://azure.microsoft.com/en-us/documentation/articles/service-bus-nodejs-how-to-use-queues/#receive-messages-from-a-queue
所示
var uuid = require('node-uuid');
var azure = require('azure');
var serviceBus = azure.createServiceBusService(connectionString);
function checkForMessages(sbService, queueName, callback) {
sbService.receiveQueueMessage(queueName,{ isPeekLock: true }, function (err, lockedMessage) {
if (err) {
if (err === 'No messages to receive') {
console.log('No messages');
} else {
callback(err);
}
} else {
callback(null, lockedMessage);
}
});
}
function processMessage(sbService, err, lockedMsg) {
if (err) {
console.log('Error on Rx: ', err);
} else {
console.log('Rx: ', lockedMsg);
sbService.deleteMessage(lockedMsg, function(err2) {
if (err2) {
console.log('Failed to delete message: ', err2);
} else {
console.log('Deleted message.');
}
})
}
}
var idx = 0;
function sendMessages(serviceBus, queueName) {
var msg = 'Message # ' + (++idx) + (' '+uuid.v4());
serviceBus.sendQueueMessage(queueName, msg, function (err) {
if (err) {
console.log('Failed Tx: ', err);
} else {
console.log('Sent ' + msg);
}
});
}
var queueName = 'myqueue';
serviceBus.getQueue(queueName, function (err,res) {
if (err) {
console.log('Failed: ', err);
} else {
console.log('current msg count '+ res.MessageCount);
// var t = setInterval(checkForMessages.bind(null, serviceBus, queueName,function(err, lockedMsg){}), 10); //ReceiveAndDelete mode
var t = setInterval(checkForMessages.bind(null, serviceBus, queueName, processMessage.bind(null, serviceBus)), 10); // PeekLock mode
// setInterval(sendMessages.bind(null, serviceBus, queueName), 100);
setTimeout(function(){
clearInterval(t);
console.log('task over');
},1000);
}
});
使用 Azure NodeJS 库,我无法以每秒 5 或 6 条消息的速度从 Azure 服务总线队列接收消息。这比官方文档建议的要慢几个数量级。我正在使用关闭分区的队列(如此处推荐
基于这个问题 (Azure Service Bus Scalability) 似乎其他人在使用 NodeJS 时也每秒看到 5/6 条消息。这几乎是一个硬性限制吗?任何已知的解决方法或优化?
你提供的线程中的代码很棒,它似乎在管道工作流中从Service Bus Queue接收消息,接收完上一条消息后将接收下一条消息。
并且如official site场景阶段高吞吐量队列部分所述,我们可以找到以下适合您情况的要点:
To increase the overall receive rate from the queue, use multiple message factories to create receivers.
Use asynchronous operations to take advantage of client-side batching.
Set the batching interval to 50ms to reduce the number of Service Bus client protocol transmissions. If multiple senders are used, increase the batching interval to 100ms.
为了实现这些优化,我们可以利用 sample on Azure github repo.
并最大化单个队列的吞吐量。我有一个简单的测试利用上面的示例代码并将循环时间设置为 10ms。 我测试的结果是:在ReceiveAndDelete模式下每秒会收到近50条消息;在 PeekLock 模式下,它每秒会收到大约 70 条消息,如 https://azure.microsoft.com/en-us/documentation/articles/service-bus-nodejs-how-to-use-queues/#receive-messages-from-a-queue
所示var uuid = require('node-uuid');
var azure = require('azure');
var serviceBus = azure.createServiceBusService(connectionString);
function checkForMessages(sbService, queueName, callback) {
sbService.receiveQueueMessage(queueName,{ isPeekLock: true }, function (err, lockedMessage) {
if (err) {
if (err === 'No messages to receive') {
console.log('No messages');
} else {
callback(err);
}
} else {
callback(null, lockedMessage);
}
});
}
function processMessage(sbService, err, lockedMsg) {
if (err) {
console.log('Error on Rx: ', err);
} else {
console.log('Rx: ', lockedMsg);
sbService.deleteMessage(lockedMsg, function(err2) {
if (err2) {
console.log('Failed to delete message: ', err2);
} else {
console.log('Deleted message.');
}
})
}
}
var idx = 0;
function sendMessages(serviceBus, queueName) {
var msg = 'Message # ' + (++idx) + (' '+uuid.v4());
serviceBus.sendQueueMessage(queueName, msg, function (err) {
if (err) {
console.log('Failed Tx: ', err);
} else {
console.log('Sent ' + msg);
}
});
}
var queueName = 'myqueue';
serviceBus.getQueue(queueName, function (err,res) {
if (err) {
console.log('Failed: ', err);
} else {
console.log('current msg count '+ res.MessageCount);
// var t = setInterval(checkForMessages.bind(null, serviceBus, queueName,function(err, lockedMsg){}), 10); //ReceiveAndDelete mode
var t = setInterval(checkForMessages.bind(null, serviceBus, queueName, processMessage.bind(null, serviceBus)), 10); // PeekLock mode
// setInterval(sendMessages.bind(null, serviceBus, queueName), 100);
setTimeout(function(){
clearInterval(t);
console.log('task over');
},1000);
}
});