RxJS - 保留列表缓存并更新现有列表
RxJS - Keep a cache of the list and update the existing ones
我有一个 WebSocket 连接,可以做两件事:
- 在第一个 'DATA' 事件中发送完整列表(即:5 个项目)
- 在接下来的每个 'DATA' 中,它仅发送有关更新的信息。
我想获取该流,对其进行处理,保留项目缓存并执行以下操作:
- 保留现有列表。
- 如果有新事件到达并在列表中,则根据 id 更新它(这应该足够通用)。
- 如果不存在,请将其添加到列表中。
这就是我到目前为止所做的。这并不多。我每次都附加这些项目。任何帮助将不胜感激:
function createCachedList$<T extends WSMessage<T>>(observable$: Observable<T>) {
const INITIAL_STATE: any[] = [];
const [fromDataPackets$, fromNonDataPackets$] = partition(
observable$,
(value) => value.type === WSMessageType.DATA
);
const pickDataPacket = fromDataPackets$.pipe(
map((value: any) => value?.data),
scan((prevState, currState: any[]) => {
const nextState = R.uniq([...prevState, ...currState]);
return [...prevState, ...nextState];
}, INITIAL_STATE),
tap((data: any) => console.log('Data:', data)),
map((data: any) => ({ type: WSMessageType.DATA, data }))
);
return merge(pickDataPacket, fromNonDataPackets$);
}
export default createCachedList$;
您的代码似乎没问题。 scan
是我会使用的运算符。
您可能需要详细说明 scan
中的逻辑。这样的事情可能会有所帮助
scan((prevState, currState: any[]) => {
currState.forEach(m => {
const item = prevState.find(s => s.id === m.id);
if (item) {
Object.assign(item, m)
} else {
prevState.push(m)
}
});
return prevState;
}, INITIAL_STATE),
我有一个 WebSocket 连接,可以做两件事:
- 在第一个 'DATA' 事件中发送完整列表(即:5 个项目)
- 在接下来的每个 'DATA' 中,它仅发送有关更新的信息。
我想获取该流,对其进行处理,保留项目缓存并执行以下操作:
- 保留现有列表。
- 如果有新事件到达并在列表中,则根据 id 更新它(这应该足够通用)。
- 如果不存在,请将其添加到列表中。
这就是我到目前为止所做的。这并不多。我每次都附加这些项目。任何帮助将不胜感激:
function createCachedList$<T extends WSMessage<T>>(observable$: Observable<T>) {
const INITIAL_STATE: any[] = [];
const [fromDataPackets$, fromNonDataPackets$] = partition(
observable$,
(value) => value.type === WSMessageType.DATA
);
const pickDataPacket = fromDataPackets$.pipe(
map((value: any) => value?.data),
scan((prevState, currState: any[]) => {
const nextState = R.uniq([...prevState, ...currState]);
return [...prevState, ...nextState];
}, INITIAL_STATE),
tap((data: any) => console.log('Data:', data)),
map((data: any) => ({ type: WSMessageType.DATA, data }))
);
return merge(pickDataPacket, fromNonDataPackets$);
}
export default createCachedList$;
您的代码似乎没问题。 scan
是我会使用的运算符。
您可能需要详细说明 scan
中的逻辑。这样的事情可能会有所帮助
scan((prevState, currState: any[]) => {
currState.forEach(m => {
const item = prevState.find(s => s.id === m.id);
if (item) {
Object.assign(item, m)
} else {
prevState.push(m)
}
});
return prevState;
}, INITIAL_STATE),