如何管理流以处理 node.js 中的传入数据
How to manage a stream in order to process incoming data in node.js
对于文件中的每一行,我想执行一个计算密集型任务,例如图像压缩。我遇到的问题是数据进来的速度太快,使内存不堪重负。理想情况下,我希望能够在处理数据时暂停和继续流。
我最初尝试将 readline
模块与这样的文件流一起使用:
const fileStream = fs.createReadStream('long-list.txt')
const rl = readline.createInterface({ input: fileStream })
rl.on('line', (line, lineCount) => {
doTheHeavyTask(line)
})
然而,这很快就会用数千次调用 doTheHeavyTask()
。
来淹没内存
我决定将每一行推入队列并创建一个事件,在处理完上一行时使下一行出队:
const lineQ = new Queue() // From the 'queue-fifo' module
rl.on('line', (line, lineCount) => {
lineQ.enqueue(line)
})
const lineEmitter = new EventEmitter() // From the 'events' module
lineEmitter.on('processNextLine', async () => {
await doTheHeavyTask( lineQ.dequeue() )
if (!lineQ.isEmpty()) lineEmitter.emit('processNextLine')
})
setTimeout( () => lineEmitter.emit('processNextLine'), 20) // Give rl a moment to enqueue some lines
这行得通,但它看起来有点老套,而且并不比一次读取所有文件好多少。
我隐约知道 Javascript 中的 "backpressure" 和 "generators" 等概念,但我不确定如何应用它们。
这里的问题不是流本身,而是你触发的异步任务。每个任务(无论是带有闭包的回调还是异步函数)都会消耗内存。如果您同时启动多个(一千个)任务,那将使用资源。
您可以使用异步迭代器遍历这些行,并为每个行执行一个任务(并等待):
(async function () {
for await (const el of rl) {
await doHeavyTask(el);
}
})();
这将是正确的背压。
但是,它一次只一个任务,这可能会很慢。要缓冲一些元素并同时处理它们,您可以这样做:
const SIZE = 10; // to be tested with different values
(async function () {
let chunk = [];
for await(const el of rl) {
chunk.push(el);
if(chunk.length >= SIZE) {
await Promise.all(chunk.map(doHeavyTask));
chunk.length = 0;
}
}
await Promise.all(chunk.map(doHeavyTask));
})();
你需要 at least Node 11.14.0 才能正常工作。
对于文件中的每一行,我想执行一个计算密集型任务,例如图像压缩。我遇到的问题是数据进来的速度太快,使内存不堪重负。理想情况下,我希望能够在处理数据时暂停和继续流。
我最初尝试将 readline
模块与这样的文件流一起使用:
const fileStream = fs.createReadStream('long-list.txt')
const rl = readline.createInterface({ input: fileStream })
rl.on('line', (line, lineCount) => {
doTheHeavyTask(line)
})
然而,这很快就会用数千次调用 doTheHeavyTask()
。
我决定将每一行推入队列并创建一个事件,在处理完上一行时使下一行出队:
const lineQ = new Queue() // From the 'queue-fifo' module
rl.on('line', (line, lineCount) => {
lineQ.enqueue(line)
})
const lineEmitter = new EventEmitter() // From the 'events' module
lineEmitter.on('processNextLine', async () => {
await doTheHeavyTask( lineQ.dequeue() )
if (!lineQ.isEmpty()) lineEmitter.emit('processNextLine')
})
setTimeout( () => lineEmitter.emit('processNextLine'), 20) // Give rl a moment to enqueue some lines
这行得通,但它看起来有点老套,而且并不比一次读取所有文件好多少。
我隐约知道 Javascript 中的 "backpressure" 和 "generators" 等概念,但我不确定如何应用它们。
这里的问题不是流本身,而是你触发的异步任务。每个任务(无论是带有闭包的回调还是异步函数)都会消耗内存。如果您同时启动多个(一千个)任务,那将使用资源。
您可以使用异步迭代器遍历这些行,并为每个行执行一个任务(并等待):
(async function () {
for await (const el of rl) {
await doHeavyTask(el);
}
})();
这将是正确的背压。
但是,它一次只一个任务,这可能会很慢。要缓冲一些元素并同时处理它们,您可以这样做:
const SIZE = 10; // to be tested with different values
(async function () {
let chunk = [];
for await(const el of rl) {
chunk.push(el);
if(chunk.length >= SIZE) {
await Promise.all(chunk.map(doHeavyTask));
chunk.length = 0;
}
}
await Promise.all(chunk.map(doHeavyTask));
})();
你需要 at least Node 11.14.0 才能正常工作。