承诺组合流
Promise combine stream
我想逐行读取大文件并将数据插入数据库存储。
我的函数 return 它里面的 Promise 创建了流,并在调用事件 stream.on('end')
时解析它,但这不是我真正想要的,因为在 stream.on('data')
中它产生 Promise.map()
每一行,我想确定,所有插入操作都在 resolve()
调用之前完成。在这种情况下我如何生产右链?
var loadFromFile = (options) => new Promise(function (resolve, reject) {
let stream = fs.createReadStream(options.filePath, {
flags: 'r',
});
stream.on('data', (chunk) => {
/* process data chunk to string Array */
Promise.map(lines, (line) => {
/* process and insert each line here */
})
.catch((err)=>{
reject(err);
});
});
stream.on('end', () => {
if (someBuisnessLogicCheckHere) {
reject('Invalid data was found in file');
}
resolve(); // Here I am not sure, that all inserts for each chunk in Promise.map was completed
});
});
如果您想确保在映射操作中的所有承诺都已解决之前不解决,请等待 Promise.map
returns 的承诺;看评论:
var loadFromFile = (options) => new Promise(function(resolve, reject) {
let stream = fs.createReadStream(options.filePath, {
flags: 'r',
});
let promises = []; // Array of promises we'll wait for
stream.on('data', (chunk) => {
promises.push( // Remember this promise
Promise.map(lines, (line) => {
/* process and insert each line here */
})
.catch((err) => {
reject(err);
})
);
});
stream.on('end', () => {
if (someBuisnessLogicCheckHere) {
reject('Invalid data was found in file');
}
Promise.all(promises).then(() => { resolve()}); // Wait for it before resolving
});
});
请注意,我不只是做 Promise.all(promises).then(resolve);
,因为这会将一组分辨率传递给 resolve
,您的原始代码没有与调用者共享这些分辨率。
如果数组有可能在充满大多数已解决的承诺时变得相当大,您可以在它们解决时主动删除它们,只等待最后剩下的那些。
如果您需要在管道方法中使用 pause/resume 的自动化机制处理文件,请查看 scramjet。
您的示例看起来会更清晰,请参阅:
var loadFromFile = (options) =>
fs.createReadStream(options.filePath, {
flags: 'r',
})
.split(/\r?\n/)
// split by line by regex
.parse((line) => aPromiseReturningFunction(line))
// will wait until promises are resolved
.map((data) => doSomeAsyncAndReturnPromise(data))
// will wait as above
.accumulate((acc, resolved) => acc.push(resolved), [])
.then((arr) => {
if (someBuisnessLogicCheckHere) {
return Promise.reject('Invalid data was found in file');
}
// Here all your items are saved (i.e. all Promises resolved)
});
});
我想逐行读取大文件并将数据插入数据库存储。
我的函数 return 它里面的 Promise 创建了流,并在调用事件 stream.on('end')
时解析它,但这不是我真正想要的,因为在 stream.on('data')
中它产生 Promise.map()
每一行,我想确定,所有插入操作都在 resolve()
调用之前完成。在这种情况下我如何生产右链?
var loadFromFile = (options) => new Promise(function (resolve, reject) {
let stream = fs.createReadStream(options.filePath, {
flags: 'r',
});
stream.on('data', (chunk) => {
/* process data chunk to string Array */
Promise.map(lines, (line) => {
/* process and insert each line here */
})
.catch((err)=>{
reject(err);
});
});
stream.on('end', () => {
if (someBuisnessLogicCheckHere) {
reject('Invalid data was found in file');
}
resolve(); // Here I am not sure, that all inserts for each chunk in Promise.map was completed
});
});
如果您想确保在映射操作中的所有承诺都已解决之前不解决,请等待 Promise.map
returns 的承诺;看评论:
var loadFromFile = (options) => new Promise(function(resolve, reject) {
let stream = fs.createReadStream(options.filePath, {
flags: 'r',
});
let promises = []; // Array of promises we'll wait for
stream.on('data', (chunk) => {
promises.push( // Remember this promise
Promise.map(lines, (line) => {
/* process and insert each line here */
})
.catch((err) => {
reject(err);
})
);
});
stream.on('end', () => {
if (someBuisnessLogicCheckHere) {
reject('Invalid data was found in file');
}
Promise.all(promises).then(() => { resolve()}); // Wait for it before resolving
});
});
请注意,我不只是做 Promise.all(promises).then(resolve);
,因为这会将一组分辨率传递给 resolve
,您的原始代码没有与调用者共享这些分辨率。
如果数组有可能在充满大多数已解决的承诺时变得相当大,您可以在它们解决时主动删除它们,只等待最后剩下的那些。
如果您需要在管道方法中使用 pause/resume 的自动化机制处理文件,请查看 scramjet。
您的示例看起来会更清晰,请参阅:
var loadFromFile = (options) =>
fs.createReadStream(options.filePath, {
flags: 'r',
})
.split(/\r?\n/)
// split by line by regex
.parse((line) => aPromiseReturningFunction(line))
// will wait until promises are resolved
.map((data) => doSomeAsyncAndReturnPromise(data))
// will wait as above
.accumulate((acc, resolved) => acc.push(resolved), [])
.then((arr) => {
if (someBuisnessLogicCheckHere) {
return Promise.reject('Invalid data was found in file');
}
// Here all your items are saved (i.e. all Promises resolved)
});
});