如何使用 RxJs 在 Angular 中限制并发 API 请求

How To Limit Concurrent API Requests In Angular Using RxJs

我有一个 Angular 7 应用程序,可以让您上传多个文件。对于选择的每个文件,它都会向服务器发出请求。这不是很好,因为打开数百个并发调用对我的后端服务器来说似乎是一个潜在问题。

我想做的是限制我的应用程序可以发出的并发请求数。我有一个通用的 API class ,我想用它在应用程序范围内进行限制,而不仅仅是文件上传,而不是需要文件上传组件本身来管理它。

诚然,我有时对 RxJx 感到困惑,但我很确定这是可能的。

class ApiService {

    get(path: string, params: any = {}): Observable<any> {
        return this.http.get(path`, { params: params });
    }
    
    uploadFile(path: string, body: any = {}): Observable<any> {
        ...code for preparing file here...
        return this.http.post(path, body);
    }
    
}

class FileUploader {

    // called many times-- once for each file
    uploadFile(file) {
        this.apiService.uploadFile(path, body: file).subscribe(response => {
             // use response here
        })
    }
}

我想象的是,在 api class 中,不是立即在 fileUpload 或 get 函数中执行 http 调用,而是可以添加到使用最大并发性的队列或其他东西并等待拨打电话,直到有空间。但我不确定如何执行此操作,因为我已立即在文件上传器中订阅 class。

您可以使用 SubjectmergeMap 运算符

interface FileUpload {
   path: string;
   body: file;
}

export class UploadService {
  private readonly CONCURRENT_UPLOADS = 2;
  private uploadQ = new Subject<FileUpload>();

  constructor(private api: ApiService) {
    this.uploadQ.asObservable().pipe(
      mergeMap(fu => this.api.uploadFile(fu.path, fu.body)).pipe(
        // must catch error here otherwise the subscriber will fail
        // and will stop serving the Q
        catchError(err => {
          console.error('Caught error ', err);
          return of(err);
        })), this.CONCURRENT_UPLOADS),
    ).subscribe((res: WhateverResultYouGet) => {
         // process result  
      }, err => {
        // something went wrong
      });
  }

  // this is your original signature of the method but where do you get path, actually?
  /**
  * Push the file to upload Q
  */
  uploadFile(file) {
    this.uploadQ.next({path, body: file});
  }
}

您无需立即启动上传,只需将上传推送到队列即可。队列由构造函数中的订阅服务,使用 mergeMap 运算符,您可以在其中实际指定并发性。