NodeJS - 从可读流中查看数据事件而无需从可写流中相应暂停

NodeJS - Seeing data events from readable stream without corresponding pause from writable stream

我们发现生产中的一些流的内存使用率非常高。这些文件存储在 S3 中,我们在 S3 对象上打开一个可读流,然后我们将该数据通过管道传输到我们本地文件系统(在我们的 EC2 实例上)上的文件。我们的一些客户拥有非常大的文件。在一个实例中,他们有一个大小超过 6GB 的文件,处理这个文件的节点进程使用了​​太多内存,以至于我们几乎耗尽了所有交换 space 并且机器慢得像爬行一样。显然,某处存在一些内存泄漏,这正是我要追踪的。

与此同时,我对代码进行了一些扩充,以便在我们从流中看到某些事件时进行记录。我有下面的代码和一些来自日志的示例输出以及一个小测试文件。令我困惑的是,可读流接收到暂停事件,然后继续发出数据和暂停事件 WITHOUT 可写流发出耗尽事件。我在这里完全错过了什么吗?一旦可读流被暂停,它如何在接收到 drain 之前继续发出数据事件?可写流还没有表明它已经准备好,所以可读流应该不会发送任何东西......对吧?

但看看输出。前 3 个事件对我来说有意义:数据、暂停、耗尽。然后接下来的 3 个就可以了:数据、数据、暂停。但是然后它发出另一个数据和另一个暂停事件,然后最终作为第 9 个事件耗尽。我不明白为什么会发生事件 7 和 8,因为直到第 9 个事件才发生耗尽。然后在第 9 个事件之后又出现了一堆 data/pause 对,没有任何相应的消耗。为什么?我期望的是一些数据事件,然后暂停,然后 NOTHING 直到发生耗尽事件——此时数据事件可能再次发生。在我看来,一旦发生暂停,在 drain 事件触发之前根本不应该发生任何数据事件。也许我仍然从根本上误解了一些关于 Node 流的东西?

更新:文档没有提及任何有关可读流发出的暂停事件的信息,但他们确实提到了暂停功能可用。大概这会在可写流 returns 为假时被调用,我会假设暂停函数会发出暂停事件。无论如何,如果调用 pause(),文档似乎与我的世界观不谋而合。参见 https://nodejs.org/docs/v0.10.30/api/stream.html#stream_class_stream_readable

This method will cause a stream in flowing-mode to stop emitting data events. Any data that becomes available will remain in the internal buffer.

此测试是 运行 在我的开发机器上进行的(Ubuntu 14.04,Node v0.10.37)。我们在生产环境中的 EC2 实例几乎是一样的。我认为他们现在 运行 v0.10.30。

S3Service.prototype.getFile = function(bucket, key, fileName) {
  var deferred = Q.defer(),
    self = this,
    s3 = self.newS3(),
    fstream = fs.createWriteStream(fileName),
    shortname = _.last(fileName.split('/'));

  logger.debug('Get file from S3 at [%s] and write to [%s]', key, fileName);

  // create a readable stream that will retrieve the file from S3
  var request = s3.getObject({
    Bucket: bucket,
    Key: key
  }).createReadStream();

  // if network request errors out then we need to reject
  request.on('error', function(err) {
      logger.error(err, 'Error encountered on S3 network request');
      deferred.reject(err);
    })
    .on('data', function() {
      logger.info('data event from readable stream for [%s]', shortname);
    })
    .on('pause', function() {
      logger.info('pause event from readable stream for [%s]', shortname);
    });

  // resolve when our writable stream closes, or reject if we get some error
  fstream.on('close', function() {
      logger.info('close event from writable stream for [%s] -- done writing file', shortname);
      deferred.resolve();
    })
    .on('error', function(err) {
      logger.error(err, 'Error encountered writing stream to [%s]', fileName);
      deferred.reject(err);
    })
    .on('drain', function() {
      logger.info('drain event from writable stream for [%s]', shortname);
    });

  // pipe the S3 request stream into a writable file stream
  request.pipe(fstream);

  return deferred.promise;
};

[2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.507Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.514Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.595Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.598Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.601Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.603Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.627Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.627Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.627Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.628Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.628Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.688Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.689Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.689Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.689Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.690Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.690Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.691Z] INFO: worker/7525 on bdmlinux: close event from writable stream for [FeedItem.csv] -- done writing file

这里可能有一些类似量子的 "observing the phenomenon changes the outcome" 情况。节点 introduced a new way of streaming in v0.10. From the docs:

If you attach a data event listener, then it will switch the stream into flowing mode, and data will be passed to your handler as soon as it is available.

也就是说,附加数据侦听器会将流恢复为经典流模式。这可能就是您的行为与您在其余文档中阅读的内容不一致的原因。为了不打扰地观察事物,您可以尝试删除 on('data') 并在使用 through 之间插入您自己的流,如下所示:

var through = require('through');

var observer = through(function write(data) {
    console.log('Data!');
    this.queue(data);
}, function end() {
    this.queue(null);
});

request.pipe(observer).pipe(fstream);