当一个流依赖于另一个流时,如何处理 node.js 流的情况?

How to handle situation with node.js streams when one stream is dependant on another one?

我正在开发批量上传功能,但遇到了这个问题。

我想存档文件,这些文件将上传到我的服务器。此外,存档将包含一个清单文件 - 它将用各种属性/元数据/等描述每个文件。

当我想发回响应时出现问题。正在读取清单文件的流已关闭,这会导致立即执行回调。下面我将展示示例。

const csv = require("fast-csv");
const fs = require("fs");
const path = require("path");

async function proccesUpload() {
  const manifestReadStream = fs.createReadStream(
    path.join(__dirname, "manifest.txt")
  );

  manifestReadStream
    .pipe(
      csv.parse({
        delimiter: ";",
      })
    )
    .on("data", async (row) => {
        // do processing for each file described in manifest file
      const hash = crypto.createHash("sha1");
      const rs = fs.createReadStream(targetFile, {
        flags: "r",
        autoClose: true,
      });
      rs.on("data", (data) => hash.update(data, "utf-8"));
      rs.on("close", function onReadStreamClose() {
        // do proccessing for file
      });
    })
    .on("end", async () => {
      // return response when all formating was performed
    });
}

通过使用嵌套读取流,在处理完所有文件之前执行on“end”。 我该如何解决?

我推荐使用异步迭代器,这将使代码更简单并且无回调

async function proccesUpload() {
  const manifestReadStream = fs.createReadStream(
    path.join(__dirname, "manifest.txt")
  );

  const parserStream = manifestReadStream.pipe(
    csv.parse({
      delimiter: ";",
    })
  );

  for await (const row of parserStream) {
    // do processing for each file described in manifest file
    const hash = crypto.createHash("sha1");
    const rs = fs.createReadStream(targetFile, {
      flags: "r",
      autoClose: true,
    });
    for await (const data of rs) {
      hash.update(data, "utf-8");
    }
    // DONE PROCESSING THE ROW
  }

  // DONE PROCESSING ALL FILES
  // return response when all formating was performed
}