Express.js:来自 Mongo 的流结果并以非阻塞方式序列化
Express.js: stream result from Mongo and serialize non-blockingly
我正在尝试做的是更新 this answer to not depend on the JSONStream library,它已不再维护。我们有:
Comment.find()
.cursor()
.pipe(JSONStream.stringify())
.pipe(res.type('json'))
它使用的是 Mongoose .cursor()
,returns 和 Node.js-compatible readable stream,但我愿意使用本机 mongo 驱动程序。
现在,我的第一个问题:是否有人仍在使用 Node.js 流,或者您现在应该使用 JavaScript 迭代器和生成器?
如果是这样,我想我将能够 convert the cursor to an iterator 并将每个块分别转换为 JSON。 (虽然进行错误处理等的图书馆建议是受欢迎的,即使这里离题而不是这个问题的核心)。
但是如何使迭代器流成为 express.js 结果?
我找不到任何关于它的文档(虽然也找不到关于 res
是可写流的文档,尽管它有效。)我在这里的想法是否正确?
编辑:
与此同时,我做了更多研究并发现了以下库:
- pull-stream – 没有 TypeScript 类型,但有 JSONStream 的创建者作为贡献者
- ts-stream – with no built-in node stream conversion
- scramjet – 看起来很有趣……也许有点矫枉过正,但至少为他们的流提供了
map
和 filter
等的实现..?
编辑:添加了自定义字符串化步骤。
ExpressJS 中的 res
对象是 http.ServerResponse 的可写子类,可以是管道数据。
我倾向于使用 NodeJS 的内置支持将迭代器转换为可读,并使用 stream.pipeline
进行异常处理来连接此数据流。
Note that it's no longer necessary to convert the cursor to a readable in NodeJS v13+, as stream.pipeline now accepts async iterators in place of a stream.
Note that it is redundant to use stringify()
if it is possible to use Mongoose's lean() directly. Lean will emit JSON data.
import stream from "stream";
import util from "util";
function handler(req, res, next){
try {
// init the cursor
const cursor = Comment.find().lean(); // "lean" will emit json data
const readable = stream.Readable.from( cursor );
// promisifying the pipeline will make it throw on errors
await util.promisify(stream.pipeline)( readable, res.type('json') );
next();
}
catch( error ){
next( error );
}
}
在 NodeJS v13+ 中使用自定义字符串化:
import stream from "stream";
import util from "util";
function handler(req, res, next){
try {
// init the cursor
const cursor = Comment.find().lean(); // "lean" will emit json data
const readable = stream.Readable.from( cursor );
// promisifying the pipeline will make it throw on errors
await util.promisify(stream.pipeline)(
readable,
// Custom "stringifying" using an async iterator
async function*( source ){
// Add some output before the result from mongodb. Typically, this could be information about
// the number of results in a REST API.
yield "Appended"
for await (const comment of source ){
// Emit a "projection" of the data retrieved from MongoDB
yield {
text: comment.text,
// Add a new property:
newProperty: true
}
}
// Add some final data to the response. In a REST API, this might be the closing bracket of an array "]".
yield "Prended"
},
// the stringified data is then piped to express' res object
res.type('json')
);
next();
}
catch( error ){
next( error );
}
}
我正在尝试做的是更新 this answer to not depend on the JSONStream library,它已不再维护。我们有:
Comment.find()
.cursor()
.pipe(JSONStream.stringify())
.pipe(res.type('json'))
它使用的是 Mongoose .cursor()
,returns 和 Node.js-compatible readable stream,但我愿意使用本机 mongo 驱动程序。
现在,我的第一个问题:是否有人仍在使用 Node.js 流,或者您现在应该使用 JavaScript 迭代器和生成器?
如果是这样,我想我将能够 convert the cursor to an iterator 并将每个块分别转换为 JSON。 (虽然进行错误处理等的图书馆建议是受欢迎的,即使这里离题而不是这个问题的核心)。
但是如何使迭代器流成为 express.js 结果?
我找不到任何关于它的文档(虽然也找不到关于 res
是可写流的文档,尽管它有效。)我在这里的想法是否正确?
编辑:
与此同时,我做了更多研究并发现了以下库:
- pull-stream – 没有 TypeScript 类型,但有 JSONStream 的创建者作为贡献者
- ts-stream – with no built-in node stream conversion
- scramjet – 看起来很有趣……也许有点矫枉过正,但至少为他们的流提供了
map
和filter
等的实现..?
编辑:添加了自定义字符串化步骤。
ExpressJS 中的 res
对象是 http.ServerResponse 的可写子类,可以是管道数据。
我倾向于使用 NodeJS 的内置支持将迭代器转换为可读,并使用 stream.pipeline
进行异常处理来连接此数据流。
Note that it's no longer necessary to convert the cursor to a readable in NodeJS v13+, as stream.pipeline now accepts async iterators in place of a stream.
Note that it is redundant to use
stringify()
if it is possible to use Mongoose's lean() directly. Lean will emit JSON data.
import stream from "stream";
import util from "util";
function handler(req, res, next){
try {
// init the cursor
const cursor = Comment.find().lean(); // "lean" will emit json data
const readable = stream.Readable.from( cursor );
// promisifying the pipeline will make it throw on errors
await util.promisify(stream.pipeline)( readable, res.type('json') );
next();
}
catch( error ){
next( error );
}
}
在 NodeJS v13+ 中使用自定义字符串化:
import stream from "stream";
import util from "util";
function handler(req, res, next){
try {
// init the cursor
const cursor = Comment.find().lean(); // "lean" will emit json data
const readable = stream.Readable.from( cursor );
// promisifying the pipeline will make it throw on errors
await util.promisify(stream.pipeline)(
readable,
// Custom "stringifying" using an async iterator
async function*( source ){
// Add some output before the result from mongodb. Typically, this could be information about
// the number of results in a REST API.
yield "Appended"
for await (const comment of source ){
// Emit a "projection" of the data retrieved from MongoDB
yield {
text: comment.text,
// Add a new property:
newProperty: true
}
}
// Add some final data to the response. In a REST API, this might be the closing bracket of an array "]".
yield "Prended"
},
// the stringified data is then piped to express' res object
res.type('json')
);
next();
}
catch( error ){
next( error );
}
}