如何链接 rxjs observables 以获取内部 observable 和 return 的价值 Angular

How to chain rxjs observables to get value of inner observable and return with parent in Angular

我向 firestore 发出请求以获取用户的聊天记录,它 returns 具有此形状的对象的可观察数组

[...,
  {
     key: '83hed87djhe09',
     participants: ['owner_43', 'visitor_69']
  },
...]

这会显示 UI 中所有用户聊天的列表,但我想按用户名搜索聊天。为此,我必须向后端服务器发出一个 http 请求以获取每个参与者的用户名,并在参与者线程中替换它以使其可按类型搜索。

所以 'owner_43' 会变成 'John Doe' 例如。

我遇到的问题是,我得到的是参与者姓名的可观察值数组,而不是字符串数组。

这是我的代码

this.chat$ = this.chatSvc.getUserChats(this.userUID).pipe(
  map((chats: any) => {
    return chats.map((chat: any) => {
      return {
        key: chat.key,
        participants: chat.participants.map((participant: any) => {
          return this.userSvc.getUserFromString(participant);
        })
      }
    })
  })
);

这是 getUserFromString 函数:

getUserFromString(stringId){

  let splitValue = stringId.split("_");
  let accountType = splitValue[0];
  let id = splitValue[1];

  if (accountType === 'owner') {
    return this.ownerSvc.getOwner(id);
  }
  else if (accountType === 'visitor') {
    return this.visitorSvc.getVisitor(id);
  }
}

简单获取所有者函数returns:

return this.http.get(owner_url + id);

最后,使用 angular 异步管道

在视图中展开结果
<ul><li *ngFor="let msg of chat$|async">{{msg.key}}</li></ul>

我做错了什么?

假设您的 Chat 看起来像这样:

/**
 * Type parameter is being used
 * due to participants initially being of type string[]
 * and ultimately of type User[]
 */
class Chat<T> {
  key: string;
  participants: T[]
}

考虑以下实现:

this.chat$: Observable<Chat<User>[]> = this.chatSvc.getUserChats(this.userUID).pipe(
  mergeAll(),
  mergeMap(({ key, participants }: Chat<string>) => {
    return forkJoin(participants.map(this.userSvc.getUserFromString)).pipe(
      map(participants => ({ key, participants }))
    )
  }),
  toArray()
)

解释(简体):

this.chat$ = this.chatSvc.getUserChats(this.userUID).pipe(

/**
 * flattens Observable<Chat[]> into a stream
 * of Observable<Chat> so we can handle single Chat at a time
 */
  mergeAll(), 

 /**
  *  transform each Chat into an 
  *  Observable containing a new value
  */
  mergeMap(({ key, participants }: Chat) => {

   /**
    * transform participants (String[]) into an array of Observables 
    * (Observable<User>[])
    */
    const participants$ = participants.map(this.userSvc.getUserFromString)
    
   /**
    *  wait for all Observables to complete using forkJoin
    *  then return new Chat using map
    */
    return forkJoin(participants).pipe(
      map(participants => ({ key: key, participants: participants  }))
    )
  }),
  toArray() // <= transform stream back into array (Observable<Chat[]>)
)