如何使用 fetch 处理流式数据?

How to handle streaming data using fetch?

我使用 async for 在处理来自 node.js 进程的输出流方面取得了巨大成功,但我正在努力获得我希望可以 "just work" 使用浏览器 [=12] 的东西=] API.

这非常适合异步处理来自进程的输出流块:

for await (const out of proc.child.stdout) {
  ...
}

(当然是在异步函数上下文中)

我试图在浏览器中做类似的事情,我想在浏览器中访问从服务器发送给我的数据。

for await (const chunk of (await fetch('/data.jsonl')).body) {
  console.log('got', chunk);
}

这在 Chrome (Uncaught TypeError: (intermediate value).body is not async iterable) 中不起作用。

对于我的用例,这不是必需的,所以我现在只是在我的客户端代码中使用 let data = await (await fetch(datapath)).text();。这类似于在等待的提取中使用 .json() 而不是 .text() 的典型用法,因此在浏览器收到整个响应之前不会开始处理。由于显而易见的原因,这并不理想。

我在看 Oboe.js (I think the relevant impl is somewhere near here),它几乎可以解决这个问题,但它的内部结构相当丑陋,所以看起来这可能是目前唯一的方法吗?

如果没有实现异步迭代(意味着还不能使用 async for),是否有另一种实际使用 ReadableStream 的方法?

我认为 2020 年年中的当前情况是 async for 还不能在 fetch 主体上工作。

https://github.com/whatwg/streams/issues/778 此问题似乎存在浏览器跟踪错误,其中 none 已实现该功能。

我目前不知道还有其他方法可以利用 fetch 提供的 .body ReadableStream。

完成问题中隐含的任务的标准方法是使用 websocket。

根据规范,ReadableStream 如 fetch-API 的 Response.body does have a getIterator method。由于某种原因,它本身不是异步迭代的,您必须明确调用该方法:

const response = await fetch('/data.json');
if (!response.ok)
    throw new Error(await response.text());
for await (const chunk of response.body.getIterator()) {
    console.log('got', chunk);
}

不幸的是,异步迭代支持尚未实现,尽管在规范中。相反,您可以手动迭代,如 this example from the spec 所示。 (我会在这个答案中为您将示例转换为 async/await。)

const reader = response.body.getReader();
const { value, done } = await reader.read();

if (done) {
  console.log("The stream was already closed!");
} else {
  console.log(value);
}

您可以使用递归或循环来重复执行此操作,如 this other example:

async function readAllChunks(readableStream) {
  const reader = readableStream.getReader();
  const chunks = [];
  
  let done, value;
  while (!done) {
    ({ value, done } = await reader.read());
    if (done) {
      return chunks;
    }
    chunks.push(value);
  }
}

console.log(await readAllChunks(response.body));