有没有办法循环 Observable 并使用前一次迭代影响下一次迭代?

Is there a way to loop an Observable and use the previous iteration to influence the next?

我是 rxjs 的新手,想用它来构建视频下载器。目的是 运行 全天候 24/7 并自动录制偶尔的直播供以后观看。这是我目前所拥有的。

import { BehaviorSubject, from, defer, of } from "rxjs";
import { delay, mergeMap, repeat, tap } from "rxjs/operators";

const downloader = url => {
  const defaultDelay = 1000;
  const maxDelay = 10000;
  const delayTime = new BehaviorSubject(defaultDelay);

  /*
   * Simulated download output.
   * 
   * @return {String|Number} potentialOutput 
   *         A {Number} 1 means "FAILURE, stream is offline."
   *         A {String} means "SUCCESS, video was downloaded."
   *         1 is the most likely value returned
   * 
   * greets 
   */
  function randomWithProbability() {
    var potentialOutput = [1, 1, 1, 1, 1, "/tmp/video.mp4"];
    var idx = Math.floor(Math.random() * potentialOutput.length);
    return potentialOutput[idx];
  }

  /**
   * Simulated download. Returns a promise which resolves after 1 second.
   */
  const download = url => {
    let downloadP = new Promise((resolve, reject) => {
      setTimeout(() => {
        resolve(randomWithProbability());
      }, 1000);
    });
    return from(downloadP);
  };

  /**
   * Conditionally adjust the delay inbetween download attempts.
   *   - If the video downloaded successfuly, reset the timer to it's default.
   *     (in case the stream went down by error, we want to record again ASAP.)
   *   - If the video stream was offline, increase the delay until our next download attempt.
   *     (we don't want to be rude and flood the server)
   */
  const adjustTimer = (ytdlOutput) => {
    if (typeof ytdlOutput === 'string') {
      delayTime.next(defaultDelay); // video stream exited successfully, so reset in case the stream starts again
    } else {
      let adjustedTime = (delayTime.getValue() * 2 > maxDelay) ? maxDelay : delayTime.getValue() * 2;
      delayTime.next(adjustedTime); // video stream exited abnormally, likely due to being offline. wait longer until next attempt
    }
  };

  /**
   * The Observable.
   *   1. Start with the URL of the video stream
   *   2. delay by the time defined in delayTime
   *   3. download, merging the download observable with the parent observable.
   *   4. adjust the delayTime based on download output.
   *   5. repeat the process indefinitely.
   */
  const stream = of(url)
    .pipe(
      delay(delayTime.getValue()),
      mergeMap(download),
      tap(res => {
        adjustTimer(res);
      }),
      repeat()
    )
    
  stream.subscribe(val => {
    console.log(
      `download result:${val}, delayTime:${delayTime.getValue()}`
    );
  });
};

downloader("https://example.com/files/video.mp4");

(Stackblitz)

我遇到的问题是 {BehaviorSubject} delayTime 没有在我的循环的每次迭代中得到更新。 delayTime is 正在更新,如 delayTime.getValue() 在订阅者的回调中调用所指示的那样,但这些更改不会影响[的内存(?) =24=](?).

相反,我看到 observable 的范围 (?) 中的 delayTime 保持不变,就像它第一次订阅时一样。在 observable 的世界中,BehaviorSubject 的值没有更新,正如我希望的那样。

这就是我被困的地方。我如何重构我的代码以具有一个随时间变化的延迟计时器,并影响到下一次下载尝试的延迟?

暂时忽略 rxjs,假装你不知道这些函数是什么意思,看看这段代码:

  const stream = of(url)
    .pipe(
      delay(delayTime.getValue()),
      mergeMap(download),
      tap(res => {
        adjustTimer(res);
      }),
      repeat()
    )

匿名的简单版本是

someFunc(delayTime.getValue())

这里的问题是 delayTime.getValue() 直接计算,而不是在 someFunc 运行时计算。上面的代码也是如此:评估发生在创建 stream 变量时,而不是在每次“迭代”(更好的词:排放)时发生。


延迟运算符仅适用于固定延迟。出于您的目的,您希望使用 delayWhen,它针对每次排放进行评估:

delayWhen(() => timer(delayTime.getValue())

但是,请注意,我们需要 return 一个可观察的通知程序,而不是所需的毫秒延迟。


最后一点,访问 getValue 是未正确使用可观察对象的危险信号。这也是为什么我们实际上不使用提供给 delayWhen 中的回调的参数的原因。您的代码可以进行重构以使其具有适当的反应性,但这超出了此处的范围。