NodeJS - 从可读流中查看数据事件而无需从可写流中相应暂停
NodeJS - Seeing data events from readable stream without corresponding pause from writable stream
我们发现生产中的一些流的内存使用率非常高。这些文件存储在 S3 中,我们在 S3 对象上打开一个可读流,然后我们将该数据通过管道传输到我们本地文件系统(在我们的 EC2 实例上)上的文件。我们的一些客户拥有非常大的文件。在一个实例中,他们有一个大小超过 6GB 的文件,处理这个文件的节点进程使用了太多内存,以至于我们几乎耗尽了所有交换 space 并且机器慢得像爬行一样。显然,某处存在一些内存泄漏,这正是我要追踪的。
与此同时,我对代码进行了一些扩充,以便在我们从流中看到某些事件时进行记录。我有下面的代码和一些来自日志的示例输出以及一个小测试文件。令我困惑的是,可读流接收到暂停事件,然后继续发出数据和暂停事件 WITHOUT 可写流发出耗尽事件。我在这里完全错过了什么吗?一旦可读流被暂停,它如何在接收到 drain 之前继续发出数据事件?可写流还没有表明它已经准备好,所以可读流应该不会发送任何东西......对吧?
但看看输出。前 3 个事件对我来说有意义:数据、暂停、耗尽。然后接下来的 3 个就可以了:数据、数据、暂停。但是然后它发出另一个数据和另一个暂停事件,然后最终作为第 9 个事件耗尽。我不明白为什么会发生事件 7 和 8,因为直到第 9 个事件才发生耗尽。然后在第 9 个事件之后又出现了一堆 data/pause 对,没有任何相应的消耗。为什么?我期望的是一些数据事件,然后暂停,然后 NOTHING 直到发生耗尽事件——此时数据事件可能再次发生。在我看来,一旦发生暂停,在 drain 事件触发之前根本不应该发生任何数据事件。也许我仍然从根本上误解了一些关于 Node 流的东西?
更新:文档没有提及任何有关可读流发出的暂停事件的信息,但他们确实提到了暂停功能可用。大概这会在可写流 returns 为假时被调用,我会假设暂停函数会发出暂停事件。无论如何,如果调用 pause(),文档似乎与我的世界观不谋而合。参见 https://nodejs.org/docs/v0.10.30/api/stream.html#stream_class_stream_readable
This method will cause a stream in flowing-mode to stop emitting data
events. Any data that becomes available will remain in the internal
buffer.
此测试是 运行 在我的开发机器上进行的(Ubuntu 14.04,Node v0.10.37)。我们在生产环境中的 EC2 实例几乎是一样的。我认为他们现在 运行 v0.10.30。
S3Service.prototype.getFile = function(bucket, key, fileName) {
var deferred = Q.defer(),
self = this,
s3 = self.newS3(),
fstream = fs.createWriteStream(fileName),
shortname = _.last(fileName.split('/'));
logger.debug('Get file from S3 at [%s] and write to [%s]', key, fileName);
// create a readable stream that will retrieve the file from S3
var request = s3.getObject({
Bucket: bucket,
Key: key
}).createReadStream();
// if network request errors out then we need to reject
request.on('error', function(err) {
logger.error(err, 'Error encountered on S3 network request');
deferred.reject(err);
})
.on('data', function() {
logger.info('data event from readable stream for [%s]', shortname);
})
.on('pause', function() {
logger.info('pause event from readable stream for [%s]', shortname);
});
// resolve when our writable stream closes, or reject if we get some error
fstream.on('close', function() {
logger.info('close event from writable stream for [%s] -- done writing file', shortname);
deferred.resolve();
})
.on('error', function(err) {
logger.error(err, 'Error encountered writing stream to [%s]', fileName);
deferred.reject(err);
})
.on('drain', function() {
logger.info('drain event from writable stream for [%s]', shortname);
});
// pipe the S3 request stream into a writable file stream
request.pipe(fstream);
return deferred.promise;
};
[2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv]
[2015-05-13T17:21:00.507Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.514Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv]
[2015-05-13T17:21:00.595Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.598Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv]
[2015-05-13T17:21:00.601Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.603Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv]
[2015-05-13T17:21:00.627Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.627Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.627Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.628Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.628Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv]
[2015-05-13T17:21:00.688Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.689Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.689Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.689Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.690Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.690Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.691Z] INFO: worker/7525 on bdmlinux: close event from writable stream for [FeedItem.csv] -- done writing file
这里可能有一些类似量子的 "observing the phenomenon changes the outcome" 情况。节点 introduced a new way of streaming in v0.10. From the docs:
If you attach a data event listener, then it will switch the stream into flowing mode, and data will be passed to your handler as soon as it is available.
也就是说,附加数据侦听器会将流恢复为经典流模式。这可能就是您的行为与您在其余文档中阅读的内容不一致的原因。为了不打扰地观察事物,您可以尝试删除 on('data')
并在使用 through
之间插入您自己的流,如下所示:
var through = require('through');
var observer = through(function write(data) {
console.log('Data!');
this.queue(data);
}, function end() {
this.queue(null);
});
request.pipe(observer).pipe(fstream);
我们发现生产中的一些流的内存使用率非常高。这些文件存储在 S3 中,我们在 S3 对象上打开一个可读流,然后我们将该数据通过管道传输到我们本地文件系统(在我们的 EC2 实例上)上的文件。我们的一些客户拥有非常大的文件。在一个实例中,他们有一个大小超过 6GB 的文件,处理这个文件的节点进程使用了太多内存,以至于我们几乎耗尽了所有交换 space 并且机器慢得像爬行一样。显然,某处存在一些内存泄漏,这正是我要追踪的。
与此同时,我对代码进行了一些扩充,以便在我们从流中看到某些事件时进行记录。我有下面的代码和一些来自日志的示例输出以及一个小测试文件。令我困惑的是,可读流接收到暂停事件,然后继续发出数据和暂停事件 WITHOUT 可写流发出耗尽事件。我在这里完全错过了什么吗?一旦可读流被暂停,它如何在接收到 drain 之前继续发出数据事件?可写流还没有表明它已经准备好,所以可读流应该不会发送任何东西......对吧?
但看看输出。前 3 个事件对我来说有意义:数据、暂停、耗尽。然后接下来的 3 个就可以了:数据、数据、暂停。但是然后它发出另一个数据和另一个暂停事件,然后最终作为第 9 个事件耗尽。我不明白为什么会发生事件 7 和 8,因为直到第 9 个事件才发生耗尽。然后在第 9 个事件之后又出现了一堆 data/pause 对,没有任何相应的消耗。为什么?我期望的是一些数据事件,然后暂停,然后 NOTHING 直到发生耗尽事件——此时数据事件可能再次发生。在我看来,一旦发生暂停,在 drain 事件触发之前根本不应该发生任何数据事件。也许我仍然从根本上误解了一些关于 Node 流的东西?
更新:文档没有提及任何有关可读流发出的暂停事件的信息,但他们确实提到了暂停功能可用。大概这会在可写流 returns 为假时被调用,我会假设暂停函数会发出暂停事件。无论如何,如果调用 pause(),文档似乎与我的世界观不谋而合。参见 https://nodejs.org/docs/v0.10.30/api/stream.html#stream_class_stream_readable
This method will cause a stream in flowing-mode to stop emitting data events. Any data that becomes available will remain in the internal buffer.
此测试是 运行 在我的开发机器上进行的(Ubuntu 14.04,Node v0.10.37)。我们在生产环境中的 EC2 实例几乎是一样的。我认为他们现在 运行 v0.10.30。
S3Service.prototype.getFile = function(bucket, key, fileName) {
var deferred = Q.defer(),
self = this,
s3 = self.newS3(),
fstream = fs.createWriteStream(fileName),
shortname = _.last(fileName.split('/'));
logger.debug('Get file from S3 at [%s] and write to [%s]', key, fileName);
// create a readable stream that will retrieve the file from S3
var request = s3.getObject({
Bucket: bucket,
Key: key
}).createReadStream();
// if network request errors out then we need to reject
request.on('error', function(err) {
logger.error(err, 'Error encountered on S3 network request');
deferred.reject(err);
})
.on('data', function() {
logger.info('data event from readable stream for [%s]', shortname);
})
.on('pause', function() {
logger.info('pause event from readable stream for [%s]', shortname);
});
// resolve when our writable stream closes, or reject if we get some error
fstream.on('close', function() {
logger.info('close event from writable stream for [%s] -- done writing file', shortname);
deferred.resolve();
})
.on('error', function(err) {
logger.error(err, 'Error encountered writing stream to [%s]', fileName);
deferred.reject(err);
})
.on('drain', function() {
logger.info('drain event from writable stream for [%s]', shortname);
});
// pipe the S3 request stream into a writable file stream
request.pipe(fstream);
return deferred.promise;
};
[2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv]
[2015-05-13T17:21:00.507Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.514Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv]
[2015-05-13T17:21:00.595Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.598Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv]
[2015-05-13T17:21:00.601Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.603Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv]
[2015-05-13T17:21:00.627Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.627Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.627Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.628Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.628Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv]
[2015-05-13T17:21:00.688Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.689Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.689Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.689Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.690Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.690Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv]
[2015-05-13T17:21:00.691Z] INFO: worker/7525 on bdmlinux: close event from writable stream for [FeedItem.csv] -- done writing file
这里可能有一些类似量子的 "observing the phenomenon changes the outcome" 情况。节点 introduced a new way of streaming in v0.10. From the docs:
If you attach a data event listener, then it will switch the stream into flowing mode, and data will be passed to your handler as soon as it is available.
也就是说,附加数据侦听器会将流恢复为经典流模式。这可能就是您的行为与您在其余文档中阅读的内容不一致的原因。为了不打扰地观察事物,您可以尝试删除 on('data')
并在使用 through
之间插入您自己的流,如下所示:
var through = require('through');
var observer = through(function write(data) {
console.log('Data!');
this.queue(data);
}, function end() {
this.queue(null);
});
request.pipe(observer).pipe(fstream);