在管道运算符之间保持对变量的访问

Keep access to variables between pipe operators

我一直在尝试在节点应用程序中使用 Rxjs。 fileList$ 是来自 fs.readdirsync (字符串数组)的 return。

第一个 map() 有一个名为文件名的参数。

flatMap() readFileAsObservable() 使用 bindNodeCallback(fs.readFile) 读取文件。

我的 class Testian 需要 2 个参数; yaml-js 从读取文件创建的对象和 filename 从第一张地图创建的对象。如何在我指定的管道中访问 filename

fileList$
    .pipe(
        map((filename: string) => `${resolvedDirPath}/${filename}`),
        flatMap(
            (filePath: string) => readFileAsObservable(filePath, 'utf8') as Observable<string>
        ),
        map((fileData: string) => yaml.safeLoad(fileData) as ITestYaml),
        map((testYaml: ITestYaml) => new Testian(testYaml, [I want to use filename here])),
        flatMap((testYaml: Testian) => {
            const prom: Promise<{}> = activeTests.set(testYaml);
            outgoing.sendTest(testYaml);
            return from(prom);
        })
    )

这在任何涉及链式函数的 API 中都以类似方式处理,例如承诺。

临时变量

临时变量可用于存储超出应访问它的函数范围的值。这是一个简单但非惯用的解决方法:

let filename;

fileList$.pipe(
    map((_filename) => {
      filename = _filename;
      return `${resolvedDirPath}/${filename}`;
    }),
    flatMap((filePath) => readFileAsObservable(filePath, 'utf8')),
    map((fileData) => yaml.safeLoad(fileData)),
    map((testYaml) => new Testian(testYaml, filename)),
    flatMap((testYaml) => {
        const prom = activeTests.set(testYaml);
        outgoing.sendTest(testYaml);
        return from(prom);
    })
)

竞争条件可能存在问题,具体取决于特定的可观察值。

嵌套函数

可以嵌套使用 filename 的函数以从父作用域访问变量:

fileList$.pipe(
    flatMap((filename) => of(`${resolvedDirPath}/${filename}`).pipe(
        flatMap((filePath) => readFileAsObservable(filePath, 'utf8')),
        map((fileData) => yaml.safeLoad(fileData)),
        map((testYaml) => new Testian(testYaml, filename)
    ),
    flatMap((testYaml) => {
        const prom = activeTests.set(testYaml);
        outgoing.sendTest(testYaml);
        return from(prom);
    })
)

传递值

在可能的情况下,变量可以与其他结果一起传递:

fileList$.pipe(
    map((filename) => [filename, `${resolvedDirPath}/${filename}`]),
    flatMap(
        ([filename, filePath]) => forkJoin(filename, readFileAsObservable(filePath, 'utf8')),
    ),
    map(([filename, fileData]) => [filename, yaml.safeLoad(fileData) as ITestYaml)],
    map(([filename, testYaml]) => new Testian(testYaml, filename)),
    flatMap((testYaml) => {
        const prom = activeTests.set(testYaml);
        outgoing.sendTest(testYaml);
        return from(prom);
    })
)

异步..等待

如果一个流允许切换到 promises 和 async..await,这是可以做到的,因为函数范围的问题在 async 函数中不存在。

fileList$.pipe(
    flatMap(async (filename) => {
      const filePath = `${resolvedDirPath}/${filename}`;
      const fileData = await readFileAsObservable(filePath, 'utf8').toPromise();
      let testYaml = yaml.safeLoad(fileData);
      testYaml = new Testian(testYaml, filename);
      const prom = activeTests.set(testYaml);
      outgoing.sendTest(testYaml);
      return prom;
    })
)

因为这个 observable 已经使用了 flatMap 和 promises,所以它可以单独用 promises 安全地编写。 RxJS observables 有不适合 promise 的用例,但这不是其中之一。