RxJS 更新状态和直通

RxJS Update State and Passthrough

我有一个用户库,用于我组织中的多个 Angular 应用程序。作为 refactor/new 版本,我们希望转向基于状态的服务,其接口类似于:

export interface UserService {
    user: Observable<User>
    getProfile(url: string): Observable<User>
    updateProfile(url: string): Observable<User>
}

这样,消费者可以将组件绑定到 UserService.user,知道数据将始终是最新的,并在按钮或后台 activity 等中调用 get/update 函数,等等.

澄清: 用户库旨在用于使用 ngRx 的应用程序,以及那些不想要那么重的东西但仍然想要的应用程序基于可观察的用户绑定。

所以我像这样连接函数(略有变化):

public getProfile(url: string): Observable<User> {
    const obs = this._http.get<User>(url);
    obs.subscribe(profile => this.setProfile(profile));
    return obs;
}

其中 setProfile 以只读方式更新 UserService.user returns 的内部订阅。

我的一个消费者应用程序使用 ngRx 进行其余的状态管理。使用我的图书馆时,我们注意到一些奇怪的地方:

  1. 当包裹在 ngRx 效果中时,有时任何使用 Angular 的 HttpClient 的函数都会被调用多次。为了纠正这个问题,我使用 HttpClient 的任何函数都以 .publishLast().refCount() 结尾,以确保只进行一次 HTTP 调用。

  2. 如果调用失败,消费者无法捕获异常。例如,this.authService.getProfile().catch(error => alert(error)) 永远不会被调用。为了解决这个问题,我现在正在修改他们订阅的功能:obs.subscribe(profile => this.setProfile(profile), error => Observable.throw(error));

在 Angular 中实现 "state-store" 服务时,这是正常行为吗?

编辑: ngRx 效果示例(请注意,这是真实的实现,我上面发布的是我们实际实现的简化方式):

public BiographyUpdate: Observable<any> = this._actions.pipe(
    ofType(AuthActionTypes.BIOGRAPHY_UPDATE),
    map((action: BiographyUpdate) => action.payload),
    switchMap(bio => this._auth.update(`${Environment.Endpoints.Users}/users/sync/biography`, action)
      .map(profile => new BiographyUpdateSuccess(profile.biography))
      .catch(error => {
        console.log("AM I BEING HIT?");
        return Observable.of(new BiographyUpdateFailure(error.toString()))
      })
    )
  );

一旦您调用 subscribe 事件,它就会被触发,并且在订阅方法中解析结果(正确结果或错误)。所以当时,你调用 this.authService.getProfile().catch 方法,异步事件已经被触发并完成。这就是它不调用的原因。

您可以在 getProfile 中移动 catch 运算符,或者您可以 return 来自 getProfile 的可观察对象而不订阅它。

public getProfile(url: string): Observable<User> {
    return this._http.get<User>(url)
            .pipe( tap( profile => this.setProfile(profile))); 
}

现在通过authService服务订阅

this.authService.getProfile()
  .subscribe(profile =>  // do the work)
  .catch(error => alert(error))