RxJS:(时间)缓冲区,在下一次发射后开始
RxJS: (Time) Buffer that starts after next emittion
我想知道如何使用 RxJs (4/5) 正确地实现它?
-a-- -b----c----d-----------------------------------------------------------e------f---------------------
-5-sec after-"a"--> [abcd]---new 5 sec timer will start when "e" emited-----5 sec-after-"e"->[ef]-
我认为:
.buffer(source$.throttleTime(5000).debounceTime(5000))
在 rxjs 5 中完成这项工作
最好的方法是使用缓冲区。缓冲区有一个关闭条件,您希望在引入新项目 5 秒后关闭条件。因此,假设您有一个源流,您想要的流将是:
source.buffer(source.throttle(5100).debounce(5000));
这是 rxjs 4。我认为 rxjs 的缓冲区运算符略有不同,但想法是一样的。
说明:
节流阀确保在 5100 毫秒内您只会获得第一个 "tick"。去抖将在 5000 毫秒后传播此 "tick",因为此后没有其他 "ticks"。请注意,我选择了 5100 毫秒,因为时间并不总是完美的,如果您对两者都使用 5000 毫秒,去抖动可能会反复延迟,您会感到饥饿。无论如何,您的缓冲区不会丢失数据,只是可以将其分组为大于 5000 毫秒的块。
Rxjs 5 有一个 bufferToggle 运算符,它可能看起来是一个更好的选择,但是,您同时打开和关闭缓冲区的事实可能会变得有风险,并且由于时序问题会使您丢失数据。
尝试了所有 Rxjs 5 缓冲区变体,特别是每 n 秒发出一次是否为空的 bufferTime,我最终推出了自己的 bufferTimeLazy:
function bufferTimeLazy(timeout) {
return Rx.Observable.create(subscriber => {
let buffer = [], hdl;
return this.subscribe(res => {
buffer.push(res);
if (hdl) return;
hdl = setTimeout(() => {
subscriber.next(buffer);
buffer = [];
hdl = null;
}, timeout);
}, err => subscriber.error(err), () => subscriber.complete());
});
};
// add operator
Rx.Observable.prototype.bufferTimeLazy = bufferTimeLazy;
// example
const click$ = Rx.Observable.fromEvent(document, 'click');
click$.bufferTimeLazy(5000).subscribe(events => {
console.log(`received ${events.length} events`);
});
示例:
https://jsbin.com/nizidat/6/edit?js,console,output
想法是在缓冲区中收集事件并在第一个事件后 n 秒发出缓冲区。一旦发出,清空缓冲区并保持休眠状态,直到下一个事件到达。
如果您不想向 Observable.prototype 添加运算符,只需调用函数:
bufferTimeLazy.bind(source$)(5000)
编辑:
好的,所以 Rxjs 5 还不错:
var clicks = Rx.Observable.fromEvent(document, 'click').share();
var buffered = clicks.bufferWhen(() => clicks.delay(5000));
buffered.subscribe(x => console.log(`got ${x.length} events`));
实现相同。注意 share() 以避免重复点击订阅 - YMMV。
我正在使用 RxJS 6,无法轻易找到 5 的文档。但是,这是一个很好的问题。这是我的结果,它也在 real example 重现 Angular Material.
中的错误中得到证明
source$ = source$.pipe(buffer(source$.pipe(debounceTime(5000))));
正如 Trevor 提到的,在 RXJS 6 中没有正式的方法,但显然你需要使用 debounce + buffer
才能达到那个结果。
为了使事情正常进行,在 Typescript 和类型推断中,我创建了一个名为 bufferDebounce
的自定义 OperatorFunction这个运算符更容易使用和理解。
带有类型推断的片段
type BufferDebounce = <T>(debounce: number) => OperatorFunction<T, T[]>;
const bufferDebounce: BufferDebounce = debounce => source =>
new Observable(observer =>
source.pipe(buffer(source.pipe(debounceTime(debounce)))).subscribe({
next(x) {
observer.next(x);
},
error(err) {
observer.error(err);
},
complete() {
observer.complete();
},
})
// [as many sources until no emit during 500ms]
source.pipe(bufferDebounce(500)).subscribe(console.log)
您可以在这个工作示例中尝试:https://stackblitz.com/edit/rxjs6-buffer-debounce
我想知道如何使用 RxJs (4/5) 正确地实现它?
-a-- -b----c----d-----------------------------------------------------------e------f---------------------
-5-sec after-"a"--> [abcd]---new 5 sec timer will start when "e" emited-----5 sec-after-"e"->[ef]-
我认为:
.buffer(source$.throttleTime(5000).debounceTime(5000))
在 rxjs 5 中完成这项工作
最好的方法是使用缓冲区。缓冲区有一个关闭条件,您希望在引入新项目 5 秒后关闭条件。因此,假设您有一个源流,您想要的流将是:
source.buffer(source.throttle(5100).debounce(5000));
这是 rxjs 4。我认为 rxjs 的缓冲区运算符略有不同,但想法是一样的。
说明: 节流阀确保在 5100 毫秒内您只会获得第一个 "tick"。去抖将在 5000 毫秒后传播此 "tick",因为此后没有其他 "ticks"。请注意,我选择了 5100 毫秒,因为时间并不总是完美的,如果您对两者都使用 5000 毫秒,去抖动可能会反复延迟,您会感到饥饿。无论如何,您的缓冲区不会丢失数据,只是可以将其分组为大于 5000 毫秒的块。
Rxjs 5 有一个 bufferToggle 运算符,它可能看起来是一个更好的选择,但是,您同时打开和关闭缓冲区的事实可能会变得有风险,并且由于时序问题会使您丢失数据。
尝试了所有 Rxjs 5 缓冲区变体,特别是每 n 秒发出一次是否为空的 bufferTime,我最终推出了自己的 bufferTimeLazy:
function bufferTimeLazy(timeout) {
return Rx.Observable.create(subscriber => {
let buffer = [], hdl;
return this.subscribe(res => {
buffer.push(res);
if (hdl) return;
hdl = setTimeout(() => {
subscriber.next(buffer);
buffer = [];
hdl = null;
}, timeout);
}, err => subscriber.error(err), () => subscriber.complete());
});
};
// add operator
Rx.Observable.prototype.bufferTimeLazy = bufferTimeLazy;
// example
const click$ = Rx.Observable.fromEvent(document, 'click');
click$.bufferTimeLazy(5000).subscribe(events => {
console.log(`received ${events.length} events`);
});
示例: https://jsbin.com/nizidat/6/edit?js,console,output
想法是在缓冲区中收集事件并在第一个事件后 n 秒发出缓冲区。一旦发出,清空缓冲区并保持休眠状态,直到下一个事件到达。
如果您不想向 Observable.prototype 添加运算符,只需调用函数:
bufferTimeLazy.bind(source$)(5000)
编辑: 好的,所以 Rxjs 5 还不错:
var clicks = Rx.Observable.fromEvent(document, 'click').share();
var buffered = clicks.bufferWhen(() => clicks.delay(5000));
buffered.subscribe(x => console.log(`got ${x.length} events`));
实现相同。注意 share() 以避免重复点击订阅 - YMMV。
我正在使用 RxJS 6,无法轻易找到 5 的文档。但是,这是一个很好的问题。这是我的结果,它也在 real example 重现 Angular Material.
中的错误中得到证明source$ = source$.pipe(buffer(source$.pipe(debounceTime(5000))));
正如 Trevor 提到的,在 RXJS 6 中没有正式的方法,但显然你需要使用 debounce + buffer
才能达到那个结果。
为了使事情正常进行,在 Typescript 和类型推断中,我创建了一个名为 bufferDebounce
的自定义 OperatorFunction这个运算符更容易使用和理解。
带有类型推断的片段
type BufferDebounce = <T>(debounce: number) => OperatorFunction<T, T[]>;
const bufferDebounce: BufferDebounce = debounce => source =>
new Observable(observer =>
source.pipe(buffer(source.pipe(debounceTime(debounce)))).subscribe({
next(x) {
observer.next(x);
},
error(err) {
observer.error(err);
},
complete() {
observer.complete();
},
})
// [as many sources until no emit during 500ms]
source.pipe(bufferDebounce(500)).subscribe(console.log)
您可以在这个工作示例中尝试:https://stackblitz.com/edit/rxjs6-buffer-debounce