如何使用 RxJS 运算符避免多个嵌套订阅?

How can I avoid multiple nested subscriptions using RxJS operators?

我正在使用 Angular 进行文件加密和上传 class。许多这些操作都是异步的,因此我写的方法是 returning RxJS Observables。

// 1.
private prepareUpload(file): Observable<T>;

// 2.
private encryptData(data, filekey): Observable<T>

// 3.
private uploadEncryptedData(formData, token, range): Observable<T>

// 4.
private completeUpload(updatedFilekey, token): Observable<T>

我想将这个逻辑封装在 public upload(file) 方法中,我最终使用了嵌套订阅并且它有效,但我知道这是错误的,并且在 RxJS 中有几个反模式原因。这是代码的简化版本:

public upload(file) {
    const gen = this.indexGenerator(); // generator function

    this.prepareUpload(file).subscribe(values => {
    const [response, filekey, data] = values;

    this.encryptData(data, filekey).subscribe(encryptedDataContainer => {
      const formData = this.prepareEncDataUpload(encryptedDataContainer.data, file.name)
      const range = this.getRange(file.size, gen.next().value);

      this.uploadEncryptedData(formData, response.token, range).subscribe(() => {
        if (range.isFinalPart) {
            this.completeUpload(encryptedDataContainer.updatedFilekey, response.token).subscribe(console.log);
        }
      });

    });

  });

}

我未能使用多个 RxJS 运算符的组合来清理此代码。我的目标是避免嵌套订阅,取而代之的是 return 当工作流完成时 public upload() 方法中的单个 Observable。

谢谢!

您想在订阅前使用管道。 Pipe 允许您在流发出值之前更改流中的值。此外,使用 mergeMap 来展平订阅链。这是一个概述。这并没有提供完整的解决方案 - 给我的报酬不够 ;) - 但足以为您指明正确的方向:

this.prepareUpload(file).pipe(
  tap(values => console.log('hello1', values)),
  map(values => [response, filekey, data]),
  tap(values => console.log('hello2', values)),
  mergeMap(() =>
      // essential to catchError else an HTTP error response will disable this effect - if it uses HTTP - catchError essentially prevents the stream from erroring in which case it will never emit again
      this.encryptData(data, filekey).pipe(
        map(res => res), // do something with res here if you want
        catchError(() => {
          return of(null)
        })
      )
    ),
    filter(res => !!res)
    // more mergemap stuff here
  ).subscribe(values => {
    console.log(values)
  })

提示:使用 tap 运算符 console.log 值,因为它们向下传递流

PS:未检查语法,可能缺少逗号或括号或 2

PPS: pipe中的函数都是RxJS的operator

您可以使用 mergeMap rxjs 运算符合并这些可观察对象并摆脱嵌套订阅。

虽然有一个问题, 请注意,由于 mergeMap 同时维护多个活动的内部订阅,因此可能会通过长期存在的内部订阅造成内存泄漏。

供参考和示例: https://www.learnrxjs.io/operators/transformation/mergemap.html

I think chaining your observables would do it, you can do it with flatMap (alias for mergeMap) maybe - and

正如我在评论中提到的那样,像下面这样的东西应该可以工作(伪代码):

public upload(file) {
    const gen = this.indexGenerator(); // generator function

    return Rx.Observable.just(file).pipe(
         mergeMap(this.prepareUpload),
         mergeMap(this.encryptData),
         mergeMap(this.prepareEncDataUpload),
         mergeMap(this.prepareEncDataUpload),
         .... )
}

您可以使用 RxJs 中的 mergeMapfilter 运算符并链接您的调用。您将需要创建一些函数级变量以在链接期间使用。

import { mergeMap, filter, catchError } from 'rxjs/operators`
public upload(file) {
    const gen = this.indexGenerator(); // generator function
    let range, token;
    this.prepareUpload(file)
      .pipe(
        mergeMap((values) => {
          const [response, filekey, data] = values;
          token = response.token;
          return this.encryptData(data, filekey);
        }),
        mergeMap(encryptedDataContainer => {
          const formData = this.prepareEncDataUpload(encryptedDataContainer.data, file.name)
          range = this.getRange(file.size, gen.next().value);

          return this.uploadEncryptedData(formData, token, range);
        }),
        filter(() => !!range.isFinalPart),
        mergeMap(() => {
          return this.completeUpload(encryptedDataContainer.updatedFilekey, token);
        })
        catchError((error) => {
          console.log(error);
          // handle the error accordingly.
        })
      )
      .subscribe(() => {
        console.log('success');
      });

  }