如何从一个流中读取并同时写入多个流?
How to read from one stream and write to several at once?
假设我有一个 readable
流,例如request(URL)
。我想通过 fs.createWriteStream()
和请求管道将其响应写入磁盘。但同时我想通过 crypto.createHash()
流计算下载数据的校验和。
readable -+-> calc checksum
|
+-> write to disk
而且我想即时进行,而不需要在内存中缓冲整个响应。
看来我可以使用 oldschool on('data')
钩子来实现它。下面的伪代码:
const hashStream = crypto.createHash('sha256');
hashStream.on('error', cleanup);
const dst = fs.createWriteStream('...');
dst.on('error', cleanup);
request(...).on('data', (chunk) => {
hashStream.write(chunk);
dst.write(chunk);
}).on('end', () => {
hashStream.end();
const checksum = hashStream.read();
if (checksum != '...') {
cleanup();
} else {
dst.end();
}
}).on('error', cleanup);
function cleanup() { /* cancel streams, erase file */ };
但是这样的做法看起来很别扭。我尝试使用 stream.Transform
或 stream.Writable
来实现类似 read | calc + echo | write
的东西,但我坚持执行。
Node.js 可读流有一个 .pipe
方法,它的工作方式与 unix 管道运算符非常相似,除了您可以流式传输 js 对象以及某种类型的字符串。
Here's a link to the doc on pipe
在您的案例中使用的示例可能是这样的:
const req = request(...);
req.pipe(dst);
req.pipe(hash);
请注意,您仍然必须处理每个流的错误,因为它们不会传播,并且如果出现可读错误,目标不会关闭。
假设我有一个 readable
流,例如request(URL)
。我想通过 fs.createWriteStream()
和请求管道将其响应写入磁盘。但同时我想通过 crypto.createHash()
流计算下载数据的校验和。
readable -+-> calc checksum
|
+-> write to disk
而且我想即时进行,而不需要在内存中缓冲整个响应。
看来我可以使用 oldschool on('data')
钩子来实现它。下面的伪代码:
const hashStream = crypto.createHash('sha256');
hashStream.on('error', cleanup);
const dst = fs.createWriteStream('...');
dst.on('error', cleanup);
request(...).on('data', (chunk) => {
hashStream.write(chunk);
dst.write(chunk);
}).on('end', () => {
hashStream.end();
const checksum = hashStream.read();
if (checksum != '...') {
cleanup();
} else {
dst.end();
}
}).on('error', cleanup);
function cleanup() { /* cancel streams, erase file */ };
但是这样的做法看起来很别扭。我尝试使用 stream.Transform
或 stream.Writable
来实现类似 read | calc + echo | write
的东西,但我坚持执行。
Node.js 可读流有一个 .pipe
方法,它的工作方式与 unix 管道运算符非常相似,除了您可以流式传输 js 对象以及某种类型的字符串。
Here's a link to the doc on pipe
在您的案例中使用的示例可能是这样的:
const req = request(...);
req.pipe(dst);
req.pipe(hash);
请注意,您仍然必须处理每个流的错误,因为它们不会传播,并且如果出现可读错误,目标不会关闭。