可观察的 forkJoin 未触发

Observable forkJoin not firing

我正在尝试在两个 Observable 上使用 forkJoin。其中一个以流的形式开始...如果我直接订阅它们,我会收到响应,但 forkJoin 没有触发。有什么想法吗?

private data$: Observable<any[]>;
private statuses$: Observable<any[]>;
private queryStream = new Subject<string>();    

....

this.data$ = this.queryStream
    .startWith('')
     .flatMap(queryInput => {
            this.query = queryInput
            return this._companyService.getCompanies(this.queryRequired + ' ' + this.query, this.page, this.sort);
                })
            .share();
    
...

Observable.forkJoin(this.statuses$, this.companies$)
            .subscribe(res => {
                console.log('forkjoin');
                this._countStatus(res[0], res[1]);
            });


// This shows arrays in the console...

this.statuses$.subscribe(res => console.log(res));
this.companies$.subscribe(res => console.log(res));

// In the console
Array[9]
Array[6]

forkJoin 的一个非常常见的问题是它要求所有源 Observable 至少发出一项并且所有这些都必须完成。

换句话说,如果 this.statuses$this.companies$ 不发射任何物品并且在它们都完成之前 forkJoin 不会发射任何东西。

this.statuses$.subscribe(
    res => console.log(res),
    undefined,
    () => console.log('completed'),
);
Observable.forkJoin([
      _someService.getUsers(),
      _someService.getCustomers(),
    ])
      .subscribe((data: [Array<User>, Array<Customer>]) => {
        let users: Array<User> = data[0];
        let customer: Array<Customer> = data[1];
      }, err => {
      });





      //someService
        getUsers():Observable<User> {
          let url = '/users';
          return this._http.get(url, headers)
            .map(res => res.json());
        }

        getCustomers():Observable<Customer> {
          let url = '/customers';
          return this._http.get(url, headers)
            .map(res => res.json());
        }

forkJoin 仅在所有内部可观察对象完成时发出。 如果你需要一个相当于 forkJoin 的只听来自每个来源的单一发射,使用 combineLatest + take(1)

combineLatest(
  this.statuses$,
  this.companies$,
)
.pipe(
  take(1),
)
.subscribe(([statuses, companies]) => {
  console.log('forkjoin');
  this._countStatus(statuses, companies);
});

只要两个来源都发出,combineLatest 就会发出,take(1) 会立即取消订阅。

forkJoin 没有用,所以我使用下面的代码来解决我的问题。使用mergeMap可以将外层订阅的结果映射到内层订阅,随心所欲地订阅

this.statuses$.pipe(
    mergeMap(source => this.companies$.pipe(
        map(inner => [source , inner])
        )
    )
).subscribe(([e , r]) => {
    console.log(e , r);
})

对我来说,combineLatest 运算符就是解决方案!

.pipe(take(1)) 附加为 asObservable() 类可观察对象的管道即可。

forkJoin({
    l0: this._svc.data$.pipe(take(1)),
    l1: this._api.getLogman1(),
    l2: this._api.getLogman2(),
    l3: this._api.getLogman3(),
})
    .pipe(
        takeUntil(this._unsubscribeAll),
    )
    .subscribe(x => {
        console.log(x);
    });