Express.js:来自 Mongo 的流结果并以非阻塞方式序列化

Express.js: stream result from Mongo and serialize non-blockingly

我正在尝试做的是更新 this answer to not depend on the JSONStream library,它已不再维护。我们有:

Comment.find()
  .cursor()
  .pipe(JSONStream.stringify())
  .pipe(res.type('json'))

它使用的是 Mongoose .cursor(),returns 和 Node.js-compatible readable stream,但我愿意使用本机 mongo 驱动程序。

现在,我的第一个问题:是否有人仍在使用 Node.js 流,或者您现在应该使用 JavaScript 迭代器和生成器?

如果是这样,我想我将能够 convert the cursor to an iterator 并将每个块分别转换为 JSON。 (虽然进行错误处理等的图书馆建议是受欢迎的,即使这里离题而不是这个问题的核心)。

但是如何使迭代器流成为 express.js 结果?

我找不到任何关于它的文档(虽然也找不到关于 res 是可写流的文档,尽管它有效。)我在这里的想法是否正确?

编辑:

与此同时,我做了更多研究并发现了以下库:

编辑:添加了自定义字符串化步骤。

ExpressJS 中的 res 对象是 http.ServerResponse 的可写子类,可以是管道数据。

我倾向于使用 NodeJS 的内置支持将迭代器转换为可读,并使用 stream.pipeline 进行异常处理来连接此数据流。

Note that it's no longer necessary to convert the cursor to a readable in NodeJS v13+, as stream.pipeline now accepts async iterators in place of a stream.

Note that it is redundant to use stringify() if it is possible to use Mongoose's lean() directly. Lean will emit JSON data.

import stream from "stream";
import util from "util";

function handler(req, res, next){
  try {
    // init the cursor
    const cursor = Comment.find().lean(); // "lean" will emit json data
    const readable = stream.Readable.from( cursor );
    // promisifying the pipeline will make it throw on errors
    await util.promisify(stream.pipeline)( readable, res.type('json') );
    next();
  }
  catch( error ){
    next( error );
  }
}

在 NodeJS v13+ 中使用自定义字符串化:


import stream from "stream";
import util from "util";

function handler(req, res, next){
  try {
    // init the cursor
    const cursor = Comment.find().lean(); // "lean" will emit json data
    const readable = stream.Readable.from( cursor );
    // promisifying the pipeline will make it throw on errors
    await util.promisify(stream.pipeline)( 
      readable,
      // Custom "stringifying" using an async iterator
      async function*( source ){
        // Add some output before the result from mongodb. Typically, this could be information about 
        // the number of results in a REST API.
        yield "Appended"

        for await (const comment of source ){
          // Emit a "projection" of the data retrieved from MongoDB 
          yield {
            text: comment.text,
            // Add a new property:
            newProperty: true
          }
        }
        // Add some final data to the response. In a REST API, this might be the closing bracket of an array "]".
        yield "Prended"
      },
      // the stringified data is then piped to express' res object
      res.type('json') 
    );
    next();
  }
  catch( error ){
    next( error );
  }
}