带有 React 自定义钩子的热流订阅

Hot stream subscription with react custom hook

我需要在 React 自定义钩子中订阅一个热点流。这意味着 Ajax 请求未完成。它不停地从服务器接收数据块(格式良好JSON)。

我的源代码如下所示:

export const useQData = (
  resourceId: string,
): {
  qDataResult: Result<QData>;
} => {
  const [qData, setQData] = useState<Result<QData>>({ status: "initial" });

  function handleQData(qData: Result<QData>) {
    setQData(qData);
  }

  // get qData initially
  useEffect(() => {
    debugger;
    const observable = ajax
      .getJSON<QData>(`${process.env.REACT_APP_API_BASE}/subscription/msnOutput/${resourceId}`)
      .pipe(startWith(<QData>{ tileStatus: "NOTILE" }));
    const c = connectable<QData>(observable);
    c.connect();
    const subscription = c.subscribe({
      next: (response) => handleQData({ status: "update", payload: response }),
      error: (error) => handleQData({ status: "error", responseErrorCode: 404, messageId: error.message }),
      complete: () => handleQData({ status: "offline" }), // stream completed
    });
    return () => subscription.unsubscribe();
  }, []);

  return {
    qDataResult: qData,
  };
};

由于某种原因,流没有开始。即使 startWith() 设置的初始值也不会被处理。我是否错过了开始处理的内容?

更新:

通过为 Ajax 请求设置 includeDownloadProgress: true,我已经实现了对 运行 的处理。这是源代码:

useEffect(() => {
    observable = ajax<string>({
        url: `${process.env.REACT_APP_API_BASE}/subscription/msnOutput/${resourceId}`,
        includeDownloadProgress: true,
        responseType: "text",
      }).pipe(
        // Extract response body from response object
        map((response) => response.response),
        filter((responseString) => responseString !== ""),
        // Extract response objects
        map((responseString) => responseString.split("\n")),
        filter((responseArray) => responseArray.length > 0),
        // Find last valid qdata object
        map((responseArray) => extractValidQDataString(responseArray)),
        filter((qDataString) => qDataString !== ""),
        map((qDataString) => JSON.parse(qDataString) as QData),
        startWith(<QData>{ tileStatus: "NOTILE" }),
      );
    const c = connectable(observable);
    const s = c.subscribe({
      next: (response) => setQData({ status: "update", payload: response }),
      error: (error) => handleError(error),
      complete: () => setQData({ status: "offline" }), // stream completed
    });
    c.connect();
    setSubscription(s);
    return () => s.unsubscribe();
  }, []);

不幸的是,下载进度响应的结果有几个缺点:

It contains a growing array of responses

核心问题是 ajax() 发出的 AjaxResponse(每次都是同一个对象)有一个 response 属性 grows with each chunk。我猜这个长轮询是为了保持对旧浏览器的支持?

如果没有,我建议改用 Fetch API - 我刚才链接的文章有详细解释。

const streamResponseBody = (url: string): Observable<string> =>
  from(fetch(url)).pipe(
    op.concatMap((resp) => {
      const reader = resp.body?.getReader();
      if (!reader) {
        return throwError(() => new Error('response had no body to read'));
      }
      const encoder = new TextDecoder();
      return defer(() => reader.read()).pipe(
        op.repeat(),
        op.takeWhile(({ done }) => !done),
        op.map(({ value }) => encoder.decode(value)),
        op.filter((value) => value !== '')
      );
    })
  );

现在我们有一个块流,但我们需要一个行流,以便我们可以轻松地将它们转换为 QData。阅读您的代码,似乎可以安全地假设每个 newline-separated 字符串可能 parse-able 为 QData。此运算符缓冲块并在块到达时发出完整的行。

const chunksToLines: MonoTypeOperatorFunction<string> = pipe(
  op.scan(
    (acc, chunk) => {
      const lines = acc.buffer.concat(chunk).split('\n');
      return {
        lines: lines.slice(0, lines.length - 1),
        buffer: lines[lines.length - 1],
      };
    },
    { buffer: '', lines: [] as readonly string[] }
  ),
  op.concatMap(({ lines }) => lines)
);

The response cannot be retrieved as JSON directly

是的,这是不可避免的。 rxjs 导出不会直接与您的特定 long-polling 实现集成。但是你的代码仍然可以到达一个非常干净的地方:

// This function is a combination of extractValidQDataString and
// JSON.parse(qDataString), except it doesn't have to handle arrays.
declare const stringToQData: (input: string) => QData | undefined;

useEffect(() => {
 const s = streamResponseBody(
   `${process.env.REACT_APP_API_BASE}/subscription/msnOutput/${resourceId}`
 )
   .pipe(
     chunksToLines,
     op.map(stringToQData),
     op.filter((value): value is QData => !!value),
     op.startWith(<QData>{ tileStatus: 'NOTILE' })
   )
   .subscribe({
     next: (response) => setQData({ status: 'update', payload: response }),
     error: (error) => handleError(error),
     complete: () => setQData({ status: 'offline' }), // stream completed
   });
 setSubscription(s);
 return () => s.unsubscribe();
}, []);

另一件值得指出的事情:所写的例子没有利用“热”观察。如果不使用 connect 并直接订阅,功能将是相同的。这是因为连接的可观察对象在 useEffect 回调之外不可访问,因此不能接受额外的订阅者。