如何管理流以处理 node.js 中的传入数据

How to manage a stream in order to process incoming data in node.js

对于文件中的每一行,我想执行一个计算密集型任务,例如图像压缩。我遇到的问题是数据进来的速度太快,使内存不堪重负。理想情况下,我希望能够在处理数据时暂停和继续流。

我最初尝试将 readline 模块与这样的文件流一起使用:

const fileStream = fs.createReadStream('long-list.txt')
const rl = readline.createInterface({ input: fileStream })
rl.on('line', (line, lineCount) => {
   doTheHeavyTask(line)
})

然而,这很快就会用数千次调用 doTheHeavyTask()

来淹没内存

我决定将每一行推入队列并创建一个事件,在处理完上一行时使下一行出队:

const lineQ = new Queue() // From the 'queue-fifo' module
rl.on('line', (line, lineCount) => {
   lineQ.enqueue(line)
})
const lineEmitter = new EventEmitter() // From the 'events' module
lineEmitter.on('processNextLine', async () => {
    await doTheHeavyTask( lineQ.dequeue() )
    if (!lineQ.isEmpty()) lineEmitter.emit('processNextLine')
})
setTimeout( () => lineEmitter.emit('processNextLine'), 20) // Give rl a moment to enqueue some lines

这行得通,但它看起来有点老套,而且并不比一次读取所有文件好多少。

我隐约知道 Javascript 中的 "backpressure" 和 "generators" 等概念,但我不确定如何应用它们。

这里的问题不是流本身,而是你触发的异步任务。每个任务(无论是带有闭包的回调还是异步函数)都会消耗内存。如果您同时启动多个(一千个)任务,那将使用资源。

您可以使用异步迭代器遍历这些行,并为每个行执行一个任务(并等待):

 (async function () {
     for await (const el of rl) {
        await doHeavyTask(el);
     }
 })();

这将是正确的背压。

但是,它一次只一个任务,这可能会很慢。要缓冲一些元素并同时处理它们,您可以这样做:

 const SIZE = 10; // to be tested with different values

 (async function () {
   let chunk = [];
   for await(const el of rl) {
      chunk.push(el);
      if(chunk.length >= SIZE) {
         await Promise.all(chunk.map(doHeavyTask));
         chunk.length = 0;
      }
   }
   await Promise.all(chunk.map(doHeavyTask));
})();

你需要 at least Node 11.14.0 才能正常工作