如何连续执行异步功能,而不是仅与流中的最新功能并行
How to continuously execute an async function not in parallel with only the latest from the stream
几天来我一直被 rxjs 问题困扰。我将 rxjs 与 REACT 结合使用,并尝试将 属性 更改为 运行 的异步函数流,以快进/倒回播放器。然后进一步等待任何函数调用,直到回调 returns。如果在等待回调时流中发生任何 属性 更改,只需执行最后一个。然后重复这个过程。
下面是示例代码。
http://jsbin.com/jagoworawu/edit?js,console
const setCurrentTimeStream = Observable.bindNodeCallback(player.setCurrentTime);
// Instant execute setCurrentTimeStream on first iteration.
// Prevent next setCurrentTimeStream if not callback of previous setCurrentTimeStream was called.
// Skip all except last while waiting for setCurrentTimeStream callback and then execute it.
// Then repeat the process.
const lifecycle$ = props$
.distinctUntilKeyChanged('currentTime')
// tried with audit and throttle but it won't execute the last one
.audit(({ currentTime }) => setCurrentTimeStream(currentTime));
// Probably needs something more here?
编辑:
在阅读了一些文档后,我想出了这个。请提供反馈或其他解决方案,最好使用 rxjs 的本机方法。
const latestExecAsync = (input, action) => Observable.create((observer) => {
let queued = false;
let latestValue;
const dequeue = (reference) => {
queued = true;
action(reference.value, (error, value) => {
if (error) return output.error(error);
observer.next(reference.value);
if (reference !== latestValue) {
dequeue(latestValue);
} else {
queued = false;
}
});
};
const subscription = input.subscribe({
next: (value) => {
const reference = latestValue = { value };
if (!queued) dequeue(reference);
},
error: e => observer.error(e),
complete: () => observer.complete(),
});
return () => subscription.unsubscribe();
});
// Usage:
latestExecAync(
props$.distinctUntilKeyChanged('currentTime'),
({ currentTime }, cb) => player.setCurrentTime(currentTime, cb)
)
我终于想到了我正在寻找的行为。仅使用 rxjs 本机方法。 :)
const currentTime$ = props$.pluck('currentTime');
const $trailingCurrentTime = new Subject();
const stream$ = Observable
.merge(currentTime$, $trailingCurrentTime)
.audit(currentTime => setCurrentTime$(currentTime))
.filter(currentTime => {
if (currentTime !== player.currentTime) {
$trailingCurrentTime.next(currentTime);
}
return currentTime === player.currentTime;
});
几天来我一直被 rxjs 问题困扰。我将 rxjs 与 REACT 结合使用,并尝试将 属性 更改为 运行 的异步函数流,以快进/倒回播放器。然后进一步等待任何函数调用,直到回调 returns。如果在等待回调时流中发生任何 属性 更改,只需执行最后一个。然后重复这个过程。
下面是示例代码。 http://jsbin.com/jagoworawu/edit?js,console
const setCurrentTimeStream = Observable.bindNodeCallback(player.setCurrentTime);
// Instant execute setCurrentTimeStream on first iteration.
// Prevent next setCurrentTimeStream if not callback of previous setCurrentTimeStream was called.
// Skip all except last while waiting for setCurrentTimeStream callback and then execute it.
// Then repeat the process.
const lifecycle$ = props$
.distinctUntilKeyChanged('currentTime')
// tried with audit and throttle but it won't execute the last one
.audit(({ currentTime }) => setCurrentTimeStream(currentTime));
// Probably needs something more here?
编辑: 在阅读了一些文档后,我想出了这个。请提供反馈或其他解决方案,最好使用 rxjs 的本机方法。
const latestExecAsync = (input, action) => Observable.create((observer) => {
let queued = false;
let latestValue;
const dequeue = (reference) => {
queued = true;
action(reference.value, (error, value) => {
if (error) return output.error(error);
observer.next(reference.value);
if (reference !== latestValue) {
dequeue(latestValue);
} else {
queued = false;
}
});
};
const subscription = input.subscribe({
next: (value) => {
const reference = latestValue = { value };
if (!queued) dequeue(reference);
},
error: e => observer.error(e),
complete: () => observer.complete(),
});
return () => subscription.unsubscribe();
});
// Usage:
latestExecAync(
props$.distinctUntilKeyChanged('currentTime'),
({ currentTime }, cb) => player.setCurrentTime(currentTime, cb)
)
我终于想到了我正在寻找的行为。仅使用 rxjs 本机方法。 :)
const currentTime$ = props$.pluck('currentTime');
const $trailingCurrentTime = new Subject();
const stream$ = Observable
.merge(currentTime$, $trailingCurrentTime)
.audit(currentTime => setCurrentTime$(currentTime))
.filter(currentTime => {
if (currentTime !== player.currentTime) {
$trailingCurrentTime.next(currentTime);
}
return currentTime === player.currentTime;
});