未调用可观察订阅

Observable subscription not getting called

我有一个 Observable,我正在使用它来将承诺转换为订阅。这会生成一个集合,我需要对其进行迭代以在每个元素上调用 HTTP 服务。我正在使用 forkJoin 等待所有这些调用完成,以便我可以做其他事情,但不幸的是,我的订阅没有被调用。你看到我在这里遗漏了什么了吗?

Observable.fromPromise(this.users.getElements()).subscribe(results => {
  Observable.forkJoin(
    results.map(
      aUser => this.HttpService.submitUser(aUser).subscribe(
        results => {
          this.progress += 1;
        },
        err => {
          this.progress += 1;
          this.handleError(<any>err);
        })
    ).subscribe(
      //it never gets to either of these calls after all service calls complete
      data => {
        debugger;
        console.log(data);
        this.reset();
      },
      err => {
        debugger;
        console.log(err);
        this.reset();
      }
    ));
});

我认为您不需要在地图中订阅。

Observable.fromPromise(this.users.getElements()).subscribe(results => {
  Observable.forkJoin(
    results.map(
      aUser => this.HttpService.submitUser(aUser))
    ).subscribe(
      //it never gets to either of these calls after all service calls complete
      data => {
        debugger;
        console.log(data);
        this.reset();
      },
      err => {
        debugger;
        console.log(err);
        this.reset();
      }
    ));
});

请注意,在此处的 rxjs 示例中:

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/forkjoin.md

他们不订阅单个可观察对象——ForkJoin 让它们全部运行,然后等待它们全部 return(在您的订阅中。)

编辑:

forkjoin 源在这里:

https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/linq/observable/forkjoin.js

而且它看起来不像有钩子来找出每个完成的时间。我认为接近 UI 条的最佳方法是让每个映射的可观察对象单独订阅,所有这些都调用一个函数来增加 UI 计数条变量并对 [=29 进行一些测试=] 允许您使用数据。

一件事是您没有订阅传递给 forkJoin() 的每个 Observable。运营商必须自己做。

如果你想在每个 Observable 完成时得到通知,你可以使用 .do(undefined, undefined, () => {...}).

let observables = [
  Observable.of(42).do(undefined, undefined, () => console.log('done')),
  Observable.of('a').delay(100).do(undefined, undefined, () => console.log('done')),
  Observable.of(true).do(undefined, undefined, () => console.log('done')),
];

Observable.forkJoin(observables)
  .subscribe(results => console.log(results));

这将打印到控制台:

done
done
done
[ 42, 'a', true ]

最终还有 .finally() 运算符。但是,它与使用 .do().

不同

编辑:

当任何源 Observables 失败时,forkJoin() 运算符会重新发出错误(这意味着它也会失败)。
这意味着您需要分别捕获每个源 Observable 中的错误(例如使用 catch() 运算符)。

let observables = [
  Observable.throw(new Error())
    .catch(() => Observable.of('caught error 1'))
    .do(undefined, undefined, () => console.log('done 1')),

  Observable.of('a')
    .delay(100).catch(() => Observable.of('caught error 2'))
    .do(undefined, undefined, () => console.log('done 2')),

  Observable.of(true)
    .catch(() => Observable.of('caught error 3'))
    .do(undefined, undefined, () => console.log('done 3')),
];

Observable.forkJoin(observables)
  .subscribe(results => console.log(results));

打印:

done 1
done 3
done 2
[ 'caught error 1', 'a', true ]