node.js 如何用背压处理快生产者和慢消费者
node.js how to handle fast producer and slow consumer with backpressure
我是 node.js 的新手,不了解有关流的文档。希望得到一些提示。
我正在读取一个非常大的文件行,然后我为每一行调用一个异步网络 api。
显然读取本地文件比完成异步调用快得多:
var lineReader = require('readline').createInterface({
input: require('fs').createReadStream(program.input)
});
lineReader.on('line', function (line) {
client.execute(query, [line], function(err, result) {
// needs to pressure the line reader here
var myJSON = JSON.stringify(result);
console.log("line=%s json=%s",myJSON);
});
});
"execute"方法中加反压的方法是什么?
解决方案是将异步行为包装在流写入器中,并从写入器中限制异步 reader:
val count = 0;
var writable = new stream.Writable({
write: function (line, encoding, next) {
count++;
if (count < concurrent) {
next();
}
asyncFunctionToCall(...) {
// completion callback
// reduce the count and release back pressure
count--;
next();
...
}
});
var stream = fs.createReadStream(program.input, {encoding: 'utf8'});
stream = byline.createStream(stream);
stream.pipe(writable);
我是 node.js 的新手,不了解有关流的文档。希望得到一些提示。
我正在读取一个非常大的文件行,然后我为每一行调用一个异步网络 api。
显然读取本地文件比完成异步调用快得多:
var lineReader = require('readline').createInterface({
input: require('fs').createReadStream(program.input)
});
lineReader.on('line', function (line) {
client.execute(query, [line], function(err, result) {
// needs to pressure the line reader here
var myJSON = JSON.stringify(result);
console.log("line=%s json=%s",myJSON);
});
});
"execute"方法中加反压的方法是什么?
解决方案是将异步行为包装在流写入器中,并从写入器中限制异步 reader:
val count = 0;
var writable = new stream.Writable({
write: function (line, encoding, next) {
count++;
if (count < concurrent) {
next();
}
asyncFunctionToCall(...) {
// completion callback
// reduce the count and release back pressure
count--;
next();
...
}
});
var stream = fs.createReadStream(program.input, {encoding: 'utf8'});
stream = byline.createStream(stream);
stream.pipe(writable);