异步队列,文件流结束如何知道两者何时完成

Async queue, filestream end how to know when both finished

我在将 async.queue 与文件流一起使用时遇到了一个小问题

  1. 我有一个文件流将完成的场景
  2. 我将 fileRead 设置为 true
  3. 但是队列将为空并且已经调用了 drain
  4. 这导致我的 "done" 永远不会被调用

在我的文件流为 "end" 并且队列为空之后,说 "end the queue" 的正确方法是什么?

var fs = require('fs')
    , util = require('util')
    , stream = require('stream')
    , es = require('event-stream');

var async = require('async');

var fileRead = false;
var lineNr = 0;

var q = async.queue(function(task, callback) {
    task(function(err, lineData){
        responseLines.push(lineData);
        callback();
      });
  }, 5);

var q.drain = function() {
  if(fileRead){
    done(null, responseLines);
  }
}

var s = fs.createReadStream('very-large-file.csv')
    .pipe(es.split())
    .pipe(es.mapSync(function(line){
        s.pause();
        q.push(async.apply(insertIntoDb, line))
        s.resume();
    })
    .on('error', function(err){
       done(err);
    })
    .on('end', function(){
      fileRead = true;
    })
);

或者是否有更好的 async 用法可以让我做到这一点? 如果其中一行有错误,则逐行异步处理并能够提前退出

首先,我不确定您的代码段中有多少是伪代码,但 var q.drain = ... 无效 javascript 并且应该出错。它应该只是 q.drain = 因为你在现有对象上定义 属性 而不是声明新变量。如果不是伪代码,这可能就是为什么你的 drain 函数不会触发的原因。

有几种方法可以实现我认为您正在尝试做的事情。一种方法是检查终端处理程序中队列的长度,如果仍有要处理的项目,则设置 drain 函数。

.on('end', function(){
  if(!q.length){
    callDone();
  }
  else {
    q.drain = callDone;
  }
});

function callDone(){
  done(null, responseLines);
}

这实际上是在说 "if the queue's been processed call done, if not, call done when it has!" 我确信有很多方法可以整理您的代码,但希望这能为您的特定问题提供解决方案。