异步队列,文件流结束如何知道两者何时完成
Async queue, filestream end how to know when both finished
我在将 async.queue 与文件流一起使用时遇到了一个小问题
- 我有一个文件流将完成的场景
- 我将 fileRead 设置为 true
- 但是队列将为空并且已经调用了 drain
- 这导致我的 "done" 永远不会被调用
在我的文件流为 "end" 并且队列为空之后,说 "end the queue" 的正确方法是什么?
var fs = require('fs')
, util = require('util')
, stream = require('stream')
, es = require('event-stream');
var async = require('async');
var fileRead = false;
var lineNr = 0;
var q = async.queue(function(task, callback) {
task(function(err, lineData){
responseLines.push(lineData);
callback();
});
}, 5);
var q.drain = function() {
if(fileRead){
done(null, responseLines);
}
}
var s = fs.createReadStream('very-large-file.csv')
.pipe(es.split())
.pipe(es.mapSync(function(line){
s.pause();
q.push(async.apply(insertIntoDb, line))
s.resume();
})
.on('error', function(err){
done(err);
})
.on('end', function(){
fileRead = true;
})
);
或者是否有更好的 async 用法可以让我做到这一点?
如果其中一行有错误,则逐行异步处理并能够提前退出
首先,我不确定您的代码段中有多少是伪代码,但 var q.drain = ...
无效 javascript 并且应该出错。它应该只是 q.drain =
因为你在现有对象上定义 属性 而不是声明新变量。如果不是伪代码,这可能就是为什么你的 drain 函数不会触发的原因。
有几种方法可以实现我认为您正在尝试做的事情。一种方法是检查终端处理程序中队列的长度,如果仍有要处理的项目,则设置 drain 函数。
.on('end', function(){
if(!q.length){
callDone();
}
else {
q.drain = callDone;
}
});
function callDone(){
done(null, responseLines);
}
这实际上是在说 "if the queue's been processed call done, if not, call done when it has!" 我确信有很多方法可以整理您的代码,但希望这能为您的特定问题提供解决方案。
我在将 async.queue 与文件流一起使用时遇到了一个小问题
- 我有一个文件流将完成的场景
- 我将 fileRead 设置为 true
- 但是队列将为空并且已经调用了 drain
- 这导致我的 "done" 永远不会被调用
在我的文件流为 "end" 并且队列为空之后,说 "end the queue" 的正确方法是什么?
var fs = require('fs')
, util = require('util')
, stream = require('stream')
, es = require('event-stream');
var async = require('async');
var fileRead = false;
var lineNr = 0;
var q = async.queue(function(task, callback) {
task(function(err, lineData){
responseLines.push(lineData);
callback();
});
}, 5);
var q.drain = function() {
if(fileRead){
done(null, responseLines);
}
}
var s = fs.createReadStream('very-large-file.csv')
.pipe(es.split())
.pipe(es.mapSync(function(line){
s.pause();
q.push(async.apply(insertIntoDb, line))
s.resume();
})
.on('error', function(err){
done(err);
})
.on('end', function(){
fileRead = true;
})
);
或者是否有更好的 async 用法可以让我做到这一点? 如果其中一行有错误,则逐行异步处理并能够提前退出
首先,我不确定您的代码段中有多少是伪代码,但 var q.drain = ...
无效 javascript 并且应该出错。它应该只是 q.drain =
因为你在现有对象上定义 属性 而不是声明新变量。如果不是伪代码,这可能就是为什么你的 drain 函数不会触发的原因。
有几种方法可以实现我认为您正在尝试做的事情。一种方法是检查终端处理程序中队列的长度,如果仍有要处理的项目,则设置 drain 函数。
.on('end', function(){
if(!q.length){
callDone();
}
else {
q.drain = callDone;
}
});
function callDone(){
done(null, responseLines);
}
这实际上是在说 "if the queue's been processed call done, if not, call done when it has!" 我确信有很多方法可以整理您的代码,但希望这能为您的特定问题提供解决方案。