RxJS - 保留列表缓存并更新现有列表

RxJS - Keep a cache of the list and update the existing ones

我有一个 WebSocket 连接,可以做两件事:

我想获取该流,对其进行处理,保留项目缓存并执行以下操作:

这就是我到目前为止所做的。这并不多。我每次都附加这些项目。任何帮助将不胜感激:

function createCachedList$<T extends WSMessage<T>>(observable$: Observable<T>) {
  const INITIAL_STATE: any[] = [];

  const [fromDataPackets$, fromNonDataPackets$] = partition(
    observable$,
    (value) => value.type === WSMessageType.DATA
  );

  const pickDataPacket = fromDataPackets$.pipe(
    map((value: any) => value?.data),
    scan((prevState, currState: any[]) => {
      const nextState = R.uniq([...prevState, ...currState]);
      return [...prevState, ...nextState];
    }, INITIAL_STATE),
    tap((data: any) => console.log('Data:', data)),
    map((data: any) => ({ type: WSMessageType.DATA, data }))
  );

  return merge(pickDataPacket, fromNonDataPackets$);
}

export default createCachedList$;

您的代码似乎没问题。 scan 是我会使用的运算符。

您可能需要详细说明 scan 中的逻辑。这样的事情可能会有所帮助

scan((prevState, currState: any[]) => {
  currState.forEach(m => {
    const item = prevState.find(s => s.id === m.id);
    if (item) {
       Object.assign(item, m)
    } else {
       prevState.push(m)
    }
  });
  return prevState;
}, INITIAL_STATE),