带有 React 自定义钩子的热流订阅
Hot stream subscription with react custom hook
我需要在 React 自定义钩子中订阅一个热点流。这意味着 Ajax 请求未完成。它不停地从服务器接收数据块(格式良好JSON)。
我的源代码如下所示:
export const useQData = (
resourceId: string,
): {
qDataResult: Result<QData>;
} => {
const [qData, setQData] = useState<Result<QData>>({ status: "initial" });
function handleQData(qData: Result<QData>) {
setQData(qData);
}
// get qData initially
useEffect(() => {
debugger;
const observable = ajax
.getJSON<QData>(`${process.env.REACT_APP_API_BASE}/subscription/msnOutput/${resourceId}`)
.pipe(startWith(<QData>{ tileStatus: "NOTILE" }));
const c = connectable<QData>(observable);
c.connect();
const subscription = c.subscribe({
next: (response) => handleQData({ status: "update", payload: response }),
error: (error) => handleQData({ status: "error", responseErrorCode: 404, messageId: error.message }),
complete: () => handleQData({ status: "offline" }), // stream completed
});
return () => subscription.unsubscribe();
}, []);
return {
qDataResult: qData,
};
};
由于某种原因,流没有开始。即使 startWith()
设置的初始值也不会被处理。我是否错过了开始处理的内容?
更新:
通过为 Ajax 请求设置 includeDownloadProgress: true
,我已经实现了对 运行 的处理。这是源代码:
useEffect(() => {
observable = ajax<string>({
url: `${process.env.REACT_APP_API_BASE}/subscription/msnOutput/${resourceId}`,
includeDownloadProgress: true,
responseType: "text",
}).pipe(
// Extract response body from response object
map((response) => response.response),
filter((responseString) => responseString !== ""),
// Extract response objects
map((responseString) => responseString.split("\n")),
filter((responseArray) => responseArray.length > 0),
// Find last valid qdata object
map((responseArray) => extractValidQDataString(responseArray)),
filter((qDataString) => qDataString !== ""),
map((qDataString) => JSON.parse(qDataString) as QData),
startWith(<QData>{ tileStatus: "NOTILE" }),
);
const c = connectable(observable);
const s = c.subscribe({
next: (response) => setQData({ status: "update", payload: response }),
error: (error) => handleError(error),
complete: () => setQData({ status: "offline" }), // stream completed
});
c.connect();
setSubscription(s);
return () => s.unsubscribe();
}, []);
不幸的是,下载进度响应的结果有几个缺点:
- 它包含越来越多的回复
- 无法直接将响应检索为 JSON
有谁知道如何解决这些缺点?
It contains a growing array of responses
核心问题是 ajax()
发出的 AjaxResponse
(每次都是同一个对象)有一个 response
属性 grows with each chunk。我猜这个长轮询是为了保持对旧浏览器的支持?
如果没有,我建议改用 Fetch API - 我刚才链接的文章有详细解释。
const streamResponseBody = (url: string): Observable<string> =>
from(fetch(url)).pipe(
op.concatMap((resp) => {
const reader = resp.body?.getReader();
if (!reader) {
return throwError(() => new Error('response had no body to read'));
}
const encoder = new TextDecoder();
return defer(() => reader.read()).pipe(
op.repeat(),
op.takeWhile(({ done }) => !done),
op.map(({ value }) => encoder.decode(value)),
op.filter((value) => value !== '')
);
})
);
现在我们有一个块流,但我们需要一个行流,以便我们可以轻松地将它们转换为 QData
。阅读您的代码,似乎可以安全地假设每个 newline-separated 字符串可能 parse-able 为 QData
。此运算符缓冲块并在块到达时发出完整的行。
const chunksToLines: MonoTypeOperatorFunction<string> = pipe(
op.scan(
(acc, chunk) => {
const lines = acc.buffer.concat(chunk).split('\n');
return {
lines: lines.slice(0, lines.length - 1),
buffer: lines[lines.length - 1],
};
},
{ buffer: '', lines: [] as readonly string[] }
),
op.concatMap(({ lines }) => lines)
);
The response cannot be retrieved as JSON directly
是的,这是不可避免的。 rxjs
导出不会直接与您的特定 long-polling 实现集成。但是你的代码仍然可以到达一个非常干净的地方:
// This function is a combination of extractValidQDataString and
// JSON.parse(qDataString), except it doesn't have to handle arrays.
declare const stringToQData: (input: string) => QData | undefined;
useEffect(() => {
const s = streamResponseBody(
`${process.env.REACT_APP_API_BASE}/subscription/msnOutput/${resourceId}`
)
.pipe(
chunksToLines,
op.map(stringToQData),
op.filter((value): value is QData => !!value),
op.startWith(<QData>{ tileStatus: 'NOTILE' })
)
.subscribe({
next: (response) => setQData({ status: 'update', payload: response }),
error: (error) => handleError(error),
complete: () => setQData({ status: 'offline' }), // stream completed
});
setSubscription(s);
return () => s.unsubscribe();
}, []);
另一件值得指出的事情:所写的例子没有利用“热”观察。如果不使用 connect
并直接订阅,功能将是相同的。这是因为连接的可观察对象在 useEffect
回调之外不可访问,因此不能接受额外的订阅者。
我需要在 React 自定义钩子中订阅一个热点流。这意味着 Ajax 请求未完成。它不停地从服务器接收数据块(格式良好JSON)。
我的源代码如下所示:
export const useQData = (
resourceId: string,
): {
qDataResult: Result<QData>;
} => {
const [qData, setQData] = useState<Result<QData>>({ status: "initial" });
function handleQData(qData: Result<QData>) {
setQData(qData);
}
// get qData initially
useEffect(() => {
debugger;
const observable = ajax
.getJSON<QData>(`${process.env.REACT_APP_API_BASE}/subscription/msnOutput/${resourceId}`)
.pipe(startWith(<QData>{ tileStatus: "NOTILE" }));
const c = connectable<QData>(observable);
c.connect();
const subscription = c.subscribe({
next: (response) => handleQData({ status: "update", payload: response }),
error: (error) => handleQData({ status: "error", responseErrorCode: 404, messageId: error.message }),
complete: () => handleQData({ status: "offline" }), // stream completed
});
return () => subscription.unsubscribe();
}, []);
return {
qDataResult: qData,
};
};
由于某种原因,流没有开始。即使 startWith()
设置的初始值也不会被处理。我是否错过了开始处理的内容?
更新:
通过为 Ajax 请求设置 includeDownloadProgress: true
,我已经实现了对 运行 的处理。这是源代码:
useEffect(() => {
observable = ajax<string>({
url: `${process.env.REACT_APP_API_BASE}/subscription/msnOutput/${resourceId}`,
includeDownloadProgress: true,
responseType: "text",
}).pipe(
// Extract response body from response object
map((response) => response.response),
filter((responseString) => responseString !== ""),
// Extract response objects
map((responseString) => responseString.split("\n")),
filter((responseArray) => responseArray.length > 0),
// Find last valid qdata object
map((responseArray) => extractValidQDataString(responseArray)),
filter((qDataString) => qDataString !== ""),
map((qDataString) => JSON.parse(qDataString) as QData),
startWith(<QData>{ tileStatus: "NOTILE" }),
);
const c = connectable(observable);
const s = c.subscribe({
next: (response) => setQData({ status: "update", payload: response }),
error: (error) => handleError(error),
complete: () => setQData({ status: "offline" }), // stream completed
});
c.connect();
setSubscription(s);
return () => s.unsubscribe();
}, []);
不幸的是,下载进度响应的结果有几个缺点:
- 它包含越来越多的回复
- 无法直接将响应检索为 JSON 有谁知道如何解决这些缺点?
It contains a growing array of responses
核心问题是 ajax()
发出的 AjaxResponse
(每次都是同一个对象)有一个 response
属性 grows with each chunk。我猜这个长轮询是为了保持对旧浏览器的支持?
如果没有,我建议改用 Fetch API - 我刚才链接的文章有详细解释。
const streamResponseBody = (url: string): Observable<string> =>
from(fetch(url)).pipe(
op.concatMap((resp) => {
const reader = resp.body?.getReader();
if (!reader) {
return throwError(() => new Error('response had no body to read'));
}
const encoder = new TextDecoder();
return defer(() => reader.read()).pipe(
op.repeat(),
op.takeWhile(({ done }) => !done),
op.map(({ value }) => encoder.decode(value)),
op.filter((value) => value !== '')
);
})
);
现在我们有一个块流,但我们需要一个行流,以便我们可以轻松地将它们转换为 QData
。阅读您的代码,似乎可以安全地假设每个 newline-separated 字符串可能 parse-able 为 QData
。此运算符缓冲块并在块到达时发出完整的行。
const chunksToLines: MonoTypeOperatorFunction<string> = pipe(
op.scan(
(acc, chunk) => {
const lines = acc.buffer.concat(chunk).split('\n');
return {
lines: lines.slice(0, lines.length - 1),
buffer: lines[lines.length - 1],
};
},
{ buffer: '', lines: [] as readonly string[] }
),
op.concatMap(({ lines }) => lines)
);
The response cannot be retrieved as JSON directly
是的,这是不可避免的。 rxjs
导出不会直接与您的特定 long-polling 实现集成。但是你的代码仍然可以到达一个非常干净的地方:
// This function is a combination of extractValidQDataString and
// JSON.parse(qDataString), except it doesn't have to handle arrays.
declare const stringToQData: (input: string) => QData | undefined;
useEffect(() => {
const s = streamResponseBody(
`${process.env.REACT_APP_API_BASE}/subscription/msnOutput/${resourceId}`
)
.pipe(
chunksToLines,
op.map(stringToQData),
op.filter((value): value is QData => !!value),
op.startWith(<QData>{ tileStatus: 'NOTILE' })
)
.subscribe({
next: (response) => setQData({ status: 'update', payload: response }),
error: (error) => handleError(error),
complete: () => setQData({ status: 'offline' }), // stream completed
});
setSubscription(s);
return () => s.unsubscribe();
}, []);
另一件值得指出的事情:所写的例子没有利用“热”观察。如果不使用 connect
并直接订阅,功能将是相同的。这是因为连接的可观察对象在 useEffect
回调之外不可访问,因此不能接受额外的订阅者。