有没有办法循环 Observable 并使用前一次迭代影响下一次迭代?
Is there a way to loop an Observable and use the previous iteration to influence the next?
我是 rxjs 的新手,想用它来构建视频下载器。目的是 运行 全天候 24/7 并自动录制偶尔的直播供以后观看。这是我目前所拥有的。
import { BehaviorSubject, from, defer, of } from "rxjs";
import { delay, mergeMap, repeat, tap } from "rxjs/operators";
const downloader = url => {
const defaultDelay = 1000;
const maxDelay = 10000;
const delayTime = new BehaviorSubject(defaultDelay);
/*
* Simulated download output.
*
* @return {String|Number} potentialOutput
* A {Number} 1 means "FAILURE, stream is offline."
* A {String} means "SUCCESS, video was downloaded."
* 1 is the most likely value returned
*
* greets
*/
function randomWithProbability() {
var potentialOutput = [1, 1, 1, 1, 1, "/tmp/video.mp4"];
var idx = Math.floor(Math.random() * potentialOutput.length);
return potentialOutput[idx];
}
/**
* Simulated download. Returns a promise which resolves after 1 second.
*/
const download = url => {
let downloadP = new Promise((resolve, reject) => {
setTimeout(() => {
resolve(randomWithProbability());
}, 1000);
});
return from(downloadP);
};
/**
* Conditionally adjust the delay inbetween download attempts.
* - If the video downloaded successfuly, reset the timer to it's default.
* (in case the stream went down by error, we want to record again ASAP.)
* - If the video stream was offline, increase the delay until our next download attempt.
* (we don't want to be rude and flood the server)
*/
const adjustTimer = (ytdlOutput) => {
if (typeof ytdlOutput === 'string') {
delayTime.next(defaultDelay); // video stream exited successfully, so reset in case the stream starts again
} else {
let adjustedTime = (delayTime.getValue() * 2 > maxDelay) ? maxDelay : delayTime.getValue() * 2;
delayTime.next(adjustedTime); // video stream exited abnormally, likely due to being offline. wait longer until next attempt
}
};
/**
* The Observable.
* 1. Start with the URL of the video stream
* 2. delay by the time defined in delayTime
* 3. download, merging the download observable with the parent observable.
* 4. adjust the delayTime based on download output.
* 5. repeat the process indefinitely.
*/
const stream = of(url)
.pipe(
delay(delayTime.getValue()),
mergeMap(download),
tap(res => {
adjustTimer(res);
}),
repeat()
)
stream.subscribe(val => {
console.log(
`download result:${val}, delayTime:${delayTime.getValue()}`
);
});
};
downloader("https://example.com/files/video.mp4");
我遇到的问题是 {BehaviorSubject} delayTime 没有在我的循环的每次迭代中得到更新。 delayTime is 正在更新,如 delayTime.getValue() 在订阅者的回调中调用所指示的那样,但这些更改不会影响[的内存(?) =24=](?).
相反,我看到 observable 的范围 (?) 中的 delayTime 保持不变,就像它第一次订阅时一样。在 observable 的世界中,BehaviorSubject 的值没有更新,正如我希望的那样。
这就是我被困的地方。我如何重构我的代码以具有一个随时间变化的延迟计时器,并影响到下一次下载尝试的延迟?
暂时忽略 rxjs,假装你不知道这些函数是什么意思,看看这段代码:
const stream = of(url)
.pipe(
delay(delayTime.getValue()),
mergeMap(download),
tap(res => {
adjustTimer(res);
}),
repeat()
)
匿名的简单版本是
someFunc(delayTime.getValue())
这里的问题是 delayTime.getValue()
直接计算,而不是在 someFunc
运行时计算。上面的代码也是如此:评估发生在创建 stream
变量时,而不是在每次“迭代”(更好的词:排放)时发生。
延迟运算符仅适用于固定延迟。出于您的目的,您希望使用 delayWhen
,它针对每次排放进行评估:
delayWhen(() => timer(delayTime.getValue())
但是,请注意,我们需要 return 一个可观察的通知程序,而不是所需的毫秒延迟。
最后一点,访问 getValue
是未正确使用可观察对象的危险信号。这也是为什么我们实际上不使用提供给 delayWhen
中的回调的参数的原因。您的代码可以进行重构以使其具有适当的反应性,但这超出了此处的范围。
我是 rxjs 的新手,想用它来构建视频下载器。目的是 运行 全天候 24/7 并自动录制偶尔的直播供以后观看。这是我目前所拥有的。
import { BehaviorSubject, from, defer, of } from "rxjs";
import { delay, mergeMap, repeat, tap } from "rxjs/operators";
const downloader = url => {
const defaultDelay = 1000;
const maxDelay = 10000;
const delayTime = new BehaviorSubject(defaultDelay);
/*
* Simulated download output.
*
* @return {String|Number} potentialOutput
* A {Number} 1 means "FAILURE, stream is offline."
* A {String} means "SUCCESS, video was downloaded."
* 1 is the most likely value returned
*
* greets
*/
function randomWithProbability() {
var potentialOutput = [1, 1, 1, 1, 1, "/tmp/video.mp4"];
var idx = Math.floor(Math.random() * potentialOutput.length);
return potentialOutput[idx];
}
/**
* Simulated download. Returns a promise which resolves after 1 second.
*/
const download = url => {
let downloadP = new Promise((resolve, reject) => {
setTimeout(() => {
resolve(randomWithProbability());
}, 1000);
});
return from(downloadP);
};
/**
* Conditionally adjust the delay inbetween download attempts.
* - If the video downloaded successfuly, reset the timer to it's default.
* (in case the stream went down by error, we want to record again ASAP.)
* - If the video stream was offline, increase the delay until our next download attempt.
* (we don't want to be rude and flood the server)
*/
const adjustTimer = (ytdlOutput) => {
if (typeof ytdlOutput === 'string') {
delayTime.next(defaultDelay); // video stream exited successfully, so reset in case the stream starts again
} else {
let adjustedTime = (delayTime.getValue() * 2 > maxDelay) ? maxDelay : delayTime.getValue() * 2;
delayTime.next(adjustedTime); // video stream exited abnormally, likely due to being offline. wait longer until next attempt
}
};
/**
* The Observable.
* 1. Start with the URL of the video stream
* 2. delay by the time defined in delayTime
* 3. download, merging the download observable with the parent observable.
* 4. adjust the delayTime based on download output.
* 5. repeat the process indefinitely.
*/
const stream = of(url)
.pipe(
delay(delayTime.getValue()),
mergeMap(download),
tap(res => {
adjustTimer(res);
}),
repeat()
)
stream.subscribe(val => {
console.log(
`download result:${val}, delayTime:${delayTime.getValue()}`
);
});
};
downloader("https://example.com/files/video.mp4");
我遇到的问题是 {BehaviorSubject} delayTime 没有在我的循环的每次迭代中得到更新。 delayTime is 正在更新,如 delayTime.getValue() 在订阅者的回调中调用所指示的那样,但这些更改不会影响[的内存(?) =24=](?).
相反,我看到 observable 的范围 (?) 中的 delayTime 保持不变,就像它第一次订阅时一样。在 observable 的世界中,BehaviorSubject 的值没有更新,正如我希望的那样。
这就是我被困的地方。我如何重构我的代码以具有一个随时间变化的延迟计时器,并影响到下一次下载尝试的延迟?
暂时忽略 rxjs,假装你不知道这些函数是什么意思,看看这段代码:
const stream = of(url)
.pipe(
delay(delayTime.getValue()),
mergeMap(download),
tap(res => {
adjustTimer(res);
}),
repeat()
)
匿名的简单版本是
someFunc(delayTime.getValue())
这里的问题是 delayTime.getValue()
直接计算,而不是在 someFunc
运行时计算。上面的代码也是如此:评估发生在创建 stream
变量时,而不是在每次“迭代”(更好的词:排放)时发生。
延迟运算符仅适用于固定延迟。出于您的目的,您希望使用 delayWhen
,它针对每次排放进行评估:
delayWhen(() => timer(delayTime.getValue())
但是,请注意,我们需要 return 一个可观察的通知程序,而不是所需的毫秒延迟。
最后一点,访问 getValue
是未正确使用可观察对象的危险信号。这也是为什么我们实际上不使用提供给 delayWhen
中的回调的参数的原因。您的代码可以进行重构以使其具有适当的反应性,但这超出了此处的范围。