Node.js: for..of with pipeline 不等待完成
Node.js: for..of with pipeline not waiting for completion
我正在使用带有管道的 for..of 循环,但是循环之后的语句甚至在管道完成执行之前就已执行,即使我将 await 添加到管道中也会发生这种情况
这是我的相关代码
for(const m of metadata) {
if(m.path) {
let dir = `tmp/exports/${exportId}/csv_files_tranformed/${m.type}`;
let fname = `${dir}/${m.sname}`;
fs.mkdirSync(dir,{recursive: true}, (err) => {
if(err) throw err;
});
tempm = m;
await pipeline(
fs.createReadStream(m.path),
csv.parse({delimiter: '\t', columns: true}),
csv.transform((input) => {
return input;
}),
csv.stringify({header: true, delimiter: '\t'}),
fs.createWriteStream(fname, {encoding: 'utf16le'}),
(err) => {
if (err) {
console.error('Pipeline failed.', err);
} else {
console.log('Pipeline succeeded.');
}
}
)
}
}
如何确保管道在进入下一个语句之前完全完成。
谢谢。
您应该提供 pipeline
函数的来源。它可能是第三方库或来自节点的 stream
如果它是 stream.pipeline
而不是它的 return 是流,那么 await pipeline(...
不会等待,因为它不是 Promise
。您可以使用 util, Reference
将流变成 Promise
const util = require('util');
const { pipeline } = require('stream');
const pipelinePromise = util.promisify(pipeline);
// ...
await pipelinePromise();
我正在使用带有管道的 for..of 循环,但是循环之后的语句甚至在管道完成执行之前就已执行,即使我将 await 添加到管道中也会发生这种情况
这是我的相关代码
for(const m of metadata) {
if(m.path) {
let dir = `tmp/exports/${exportId}/csv_files_tranformed/${m.type}`;
let fname = `${dir}/${m.sname}`;
fs.mkdirSync(dir,{recursive: true}, (err) => {
if(err) throw err;
});
tempm = m;
await pipeline(
fs.createReadStream(m.path),
csv.parse({delimiter: '\t', columns: true}),
csv.transform((input) => {
return input;
}),
csv.stringify({header: true, delimiter: '\t'}),
fs.createWriteStream(fname, {encoding: 'utf16le'}),
(err) => {
if (err) {
console.error('Pipeline failed.', err);
} else {
console.log('Pipeline succeeded.');
}
}
)
}
}
如何确保管道在进入下一个语句之前完全完成。
谢谢。
您应该提供 pipeline
函数的来源。它可能是第三方库或来自节点的 stream
如果它是 stream.pipeline
而不是它的 return 是流,那么 await pipeline(...
不会等待,因为它不是 Promise
。您可以使用 util, Reference
Promise
const util = require('util');
const { pipeline } = require('stream');
const pipelinePromise = util.promisify(pipeline);
// ...
await pipelinePromise();