Observable.forkJoin 和数组参数

Observable.forkJoin and array argument

在 Observables forkJoin 文档中,它说 args 可以是一个数组,但它没有列出这样做的示例:

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/forkjoin.md

我尝试了一个与我列出的(下面)类似的函数,但出现错误:

:3000/angular2/src/platform/browser/browser_adapter.js:76 

EXCEPTION: TypeError: Observable_1.Observable.forkJoin is not a function

下面是我的函数的剪切版本:

processStuff( inputObject ) {
  let _self = this;

  return new Observable(function(observer) {
    let observableBatch = [];

    inputObject.forEach(function(componentarray, key) {
      observableBatch.push(_self.http.get(key + '.json').map((res: Response) => res.json()));
    });

    Observable.forkJoin(
      observableBatch
    // );
    ).subscribe(() => {
      observer.next();
      observer.complete();
    });

  });
}

我的问题的根源与在继续之前结束的循环有关:

但我还没有完全掌握 forkJoin 与数组的正确用法以及这样做的正确语法。

非常感谢您提供的帮助。

注意:RETURNS 可观察

的第三个函数示例
thirdFunction() {
  let _self = this;

  return Observable.create((observer) => {
  // return new Observable(function(observer) {
    ...

    observer.next(responseargs);
    observer.complete();
  });
}

processStuff(inputObject) {
  let _self = this;
  let observableBatch = [];

  inputObject.forEach((componentarray, key) => {
    observableBatch.push(_self.thirdFunction().map((res: Response) => res.json()));
  });

  return Observable.forkJoin(observableBatch);
}

elsewhere() {
  this.processStuff(inputObject)
    .subscribe()
}

您需要导入默认未加载的运算符。这就是 EXCEPTION Observable.xxxx is not a function 通常的意思。您可以通过将完整的 rxjs 添加到 bootstrap 来导入所有运算符,例如:

import 'rxjs/Rx'

或通过导入特定运算符,在您的情况下:

import 'rxjs/add/observable/forkJoin'

另一个 observation/suggestion 关于您的代码:尝试坚持使用一种语法。你在混合使用 es5、es6、typescript……当它工作时,它只会让你在长 运行 中感到困惑。此外,如果您刚开始使用 Observables,请尽量避免使用 new Observable() 并改用创建运算符;

processStuff( inputObject ) {
  let observableBatch = [];

  inputObject.forEach(( componentarray, key ) => {
    observableBatch.push( this.http.get( key + '.json').map((res: Response) => res.json()) );
  });

  return Observable.forkJoin(observableBatch);
}

elsewhere() {
  this.processStuff( inputObject )
    .subscribe()
}

最后,请参阅正确的文档 - Angular2 使用 RxJS v5 并且您提供的 link 是针对 RxJS v4 的。 v5 的文档仍然不完整,但您可以在许多源文件中找到描述。

这是一个使用 Angular 12 的工作演示。语法与接受的答案有很大不同(不再可观察。):

https://stackblitz.com/edit/angular-map-to-forkjoin?file=src/app/app.component.ts

注意:打开开发人员工具控制台以查看我正在执行的操作的输出

首先,这需要一组 JSON 项(在本例中,一组用户)并使用 map 方法将它们转换为单独的 HTTP 补丁调用。

  private mapCalls(users: any[]): Observable<any>[] {
    return users.map(user => {
      // Each User will be patched to the endpoint
      // None of these calls will be made until forkJoin is used
      return this.http.patch(
        `https://jsonplaceholder.typicode.com/users/${user.id}`,
        user
      );
    });
  }

现在是一个 Observable 数组。 forkJoin 添加为包装器以便进行这些调用

return forkJoin(mappedCalls);

然后可以订阅最终结果,最终结果将输出为具有相同项目数的单个数组。

注意:如果一个调用在 forkJoin 中失败,它们都将失败。