在 RxJs 中,你如何制作一个 throttled buffer operator?
In RxJs how can you make a throttled buffer operator?
我有以下需求:
我想在 leading
和 trailing
都设置为 true 的情况下使用 rxjs 限制函数调用。但是,我不想忽略被跳过的 fn 调用;我想将所有函数调用的输出作为数组包含在油门的开始和结束处。有点像结合 throttle
和 buffer
举个更好的例子:
我只想发出每 1 秒的点击次数。如果有很长的停顿,当下一次点击发生时,立即发出该点击的 x 和 y(在一个数组中,就好像它被缓冲了一样)。从第一次点击到 1 秒后的所有点击,都将被观察为第二个到达的数组。
这是一些我试过但似乎不起作用的代码,可在 this codesandbox:
上试用
import { pipe, asyncScheduler, interval } from "rxjs";
import { publish, throttleTime, buffer, take } from "rxjs/operators";
const throttledBuffer = (throttleTimeMs) => {
return pipe(
// publish this observable so it can be shared
publish((observable$) => {
// get invokations, and throttle them
const throttleCalls$ = observable$.pipe(
throttleTime(throttleTimeMs, asyncScheduler, {
leading: true,
trailing: true
}),
);
// buffer observable, emitting an array of accumulated values for each fn call at each observed throttle
return observable$.pipe(
buffer(throttleCalls$)
);
})
);
};
const clicks$ = interval(50);
clicks$.pipe(throttledBuffer(100),take(4)).subscribe({
next: (value) => {
console.log(value);
}
});
在上面的代码片段中,我希望得到的结果是:
[0]
[1, 2]
[3]
[4, 5]
然而,事情是这样的:
[]
[0,1,2]
[]
[3,4,5]
好像是在节气门发出声音之前发生了缓冲;或者在它之后跳过它。有人成功地制作了这样的自定义运算符吗?
TLDR;
这里有一个方法:
const throttledBuffer = (throttleTimeMs) => {
return pipe(
publish((observable$) => {
const throttleCalls$ = observable$.pipe(
// tap((v) => console.log("in throttle", v)),
throttleTime(throttleTimeMs, asyncScheduler, {
leading: true,
trailing: true
}),
// tap(() => console.log("after throttle"))
);
const src$ = observable$.pipe(share());
return merge(
src$.pipe(ignoreElements()),
src$.pipe(/* tap(console.log), */ buffer(throttleCalls$))
);
})
);
};
const clicks$ = interval(50);
clicks$.pipe(throttledBuffer(100), take(4)).subscribe({
next: (value) => {
console.log("p", value);
}
});
/*
console output:
p
(1) [0]
p
(2) [1, 2]
p
(1) [3]
p
(2) [4, 5]
*/
您可以取消注释 tap
以检查发生了什么。
可以找到工作演示 here。
为什么上述解决方案有效以及前一个解决方案出了什么问题
了解 publish
运算符
首先,重要的是要强调 how the publish
operator works:
export function publish<T, R>(selector?: OperatorFunction<T, R>): MonoTypeOperatorFunction<T> | OperatorFunction<T, R> {
return selector ? connect(selector) : multicast(new Subject<T>());
}
在这种情况下,我们对 selector
方法很感兴趣。但是,如您所见,如果您只使用 publish()
,它将使用 multicast
和 Subject
。作为旁注,publishReplay
也使用 multicast
,但使用 ReplaySubject
而不是 Subject
。
connect
函数定义as follows:
// `source` - the further-up observable in the chain
// `subscriber` - the newly created subscriber
/*
To get a better understanding of what `subscriber` and `source` really are, let's consider this quick example
const o$ = new Observable(subscriber => {});
o$.pipe(
map(...),
).subscribe({ next: v => console.log('hello', v) })
In the `map`'s implementation, when the `map` observable is subscribed, it will call again `source.subscribe(someSubscriber)`.
In that case, `someSubscriber` is a `{ next: v => console.log('hello', v) }`(roughly) and `source` is `o$`.
So, in `subscriber => {}` from `Observable`'s callback, the `subscriber` will actually be `someSubscriber`.
*/
// It is a `Subject` instance
const subject = connector();
from(selector(fromSubscribable(subject))).subscribe(subscriber);
subscriber.add(source.subscribe(subject));
fromSubscribable(subject)
将 return 一个 Observable
实例,当订阅该实例时,订阅者将被添加到 subject
的订阅者列表中。因此,您提供给 publish
的回调函数的参数将是我之前提到的 Observable
实例。所有这一切的要点是要知道在该回调 中创建的每个 订阅者都将成为 subject
订阅者列表的一部分。因此,对于 subscriber.add(source.subscribe(subject))
,subject
将订阅 source
,这意味着来自 source
的所有值都将由 subject
接收使用,这也意味着这些值将发送给该列表中的所有订阅者。
因此,要点是您在该回调函数中创建订阅者的 order 很重要。
为什么会出现问题
现在让我们先看看问题出在哪里:
const throttledBuffer = (throttleTimeMs) => {
return pipe(
publish((observable$) => {
const throttleCalls$ = observable$.pipe(
throttleTime(throttleTimeMs, asyncScheduler, {
leading: true,
trailing: true
})
);
return observable$.pipe(buffer(throttleCalls$));
})
);
};
const clicks$ = interval(50);
clicks$.pipe(throttledBuffer(100), take(4)).subscribe({
next: (value) => {
console.log("p", value);
}
});
/*
console output:
p
[]
p
(3) [0, 1, 2]
p
[]
p
(3) [3, 4, 5]
*/
通过 returning observable$.pipe(buffer(throttleCalls$))
,本质上与
相同
observable$.pipe(buffer(throttleCalls$)).subscribe(subscriber /* #1 */);
问题终于来了:)。问题是throttleCalls$
会先被订阅,因为这背后的概念可以归结为f(g(x))
,其中g(x)
会先被调用。因为 throttleCalls$
先订阅,这意味着 subject
实例会将此订阅者注册为列表中的 第一个 。结果,subscriber#1
将是第二个。这意味着当源发出一些东西时,从throttleCalls$
'订阅创建的订阅者将是第一个接收值的订阅者,这就是为什么 你会先得到一个空数组。
请记住,Subject
将值同步发送给它的订阅者。这意味着在缓冲区发出 []
之后,第二个订阅者将立即收到该值并且 buffer
的数组将为 [0]
。由于一秒还未过去,1
和 2
将到达,当第二秒最终过去时,buffer
的数组将为 [0, 1, 2]
。然后,再次发出 3
,但是接收它的 第一个订阅者 将属于 throttleCalls$
,即 buffer
的通知程序,因此,我们再次得到一个空数组。
所以,问题是订阅者添加到 Subject
的订阅者列表的顺序不是我们想要的。
为什么该解决方案有效
让我们再次看看 TLDR 部分的解决方案:
const throttledBuffer = (throttleTimeMs) => {
return pipe(
publish((observable$) => {
const throttleCalls$ = observable$.pipe(
// tap((v) => console.log("in throttle", v)),
throttleTime(throttleTimeMs, asyncScheduler, {
leading: true,
trailing: true
}),
// tap(() => console.log("after throttle"))
);
const src$ = observable$.pipe(share());
return merge(
src$.pipe(ignoreElements()),
src$.pipe(/* tap(console.log), */ buffer(throttleCalls$))
);
})
);
};
const clicks$ = interval(50);
clicks$.pipe(throttledBuffer(100), take(4)).subscribe({
next: (value) => {
console.log("p", value);
}
});
/*
console output:
p
(1) [0]
p
(2) [1, 2]
p
(1) [3]
p
(2) [4, 5]
*/
有了上一节的知识点,我们一步一步来了解是怎么回事。我们知道问题出在订单上,所以我们可以解决这个问题。我们可以通过使用 merge
来获得我们想要的顺序:
merge(
// `ignoreElements()` will ignore `next`, but will let `error`/`complete` notification in
// By providing this as the first argument, the we can be sure that the first emitted value
// will be **first** part of the buffer, and then the `buffer`'s notifier will emit
src$.pipe(ignoreElements()),
// Ensuring the `buffer`'s notifier is the **second** in the list
src$.pipe(/* tap(console.log), */ buffer(throttleCalls$))
);
但是,为了确保一切按预期工作,还有一件事要做,那就是
const src$ = observable$.pipe(share());
share()
所做的是在数据消费者(2 个 merge
参数)和数据生产者(observable$
)之间添加一个 Subject
实例。
我们需要这个的原因最好借助这张小图来解释:
s1 (the `observable$` argument)
/ \
s2 throttleCalls$
/ \
(1)ignoreElements (2)buffer(throttleCalls$)
s2 - the Subject we got from `share()`
所以,当s1
收到一个值时,s2
会先收到,然后是(1)
,然后是(2)
,最后是throttleCalls$
。这就是我们想要的,因为我们首先添加到缓冲区,然后 buffer
的通知程序(即 throttleCalls$
)发出。
现在您可能想知道,为什么它不能像这样工作?
// const src$ = observable$.pipe(share());
return merge(
observable$.pipe(ignoreElements()),
observable$.pipe(/* tap(console.log), */ buffer(throttleCalls$))
);
上述片段的图表如下所示:
s1 (the `observable$` argument)
/ \
(1)ignoreElements (2)buffer(throttleCalls$)
好吧,我们又回到了最初遇到的同样问题,throttleCalls$
先订阅,因为 (1)
和 (2)
完全独立.订阅者列表(大致)如下所示:
[
ignoreElements,
throttleCalls$,
(2) - the `buffer`'s source
]
我有以下需求:
我想在 leading
和 trailing
都设置为 true 的情况下使用 rxjs 限制函数调用。但是,我不想忽略被跳过的 fn 调用;我想将所有函数调用的输出作为数组包含在油门的开始和结束处。有点像结合 throttle
和 buffer
举个更好的例子:
我只想发出每 1 秒的点击次数。如果有很长的停顿,当下一次点击发生时,立即发出该点击的 x 和 y(在一个数组中,就好像它被缓冲了一样)。从第一次点击到 1 秒后的所有点击,都将被观察为第二个到达的数组。
这是一些我试过但似乎不起作用的代码,可在 this codesandbox:
上试用import { pipe, asyncScheduler, interval } from "rxjs";
import { publish, throttleTime, buffer, take } from "rxjs/operators";
const throttledBuffer = (throttleTimeMs) => {
return pipe(
// publish this observable so it can be shared
publish((observable$) => {
// get invokations, and throttle them
const throttleCalls$ = observable$.pipe(
throttleTime(throttleTimeMs, asyncScheduler, {
leading: true,
trailing: true
}),
);
// buffer observable, emitting an array of accumulated values for each fn call at each observed throttle
return observable$.pipe(
buffer(throttleCalls$)
);
})
);
};
const clicks$ = interval(50);
clicks$.pipe(throttledBuffer(100),take(4)).subscribe({
next: (value) => {
console.log(value);
}
});
在上面的代码片段中,我希望得到的结果是:
[0]
[1, 2]
[3]
[4, 5]
然而,事情是这样的:
[]
[0,1,2]
[]
[3,4,5]
好像是在节气门发出声音之前发生了缓冲;或者在它之后跳过它。有人成功地制作了这样的自定义运算符吗?
TLDR;
这里有一个方法:
const throttledBuffer = (throttleTimeMs) => {
return pipe(
publish((observable$) => {
const throttleCalls$ = observable$.pipe(
// tap((v) => console.log("in throttle", v)),
throttleTime(throttleTimeMs, asyncScheduler, {
leading: true,
trailing: true
}),
// tap(() => console.log("after throttle"))
);
const src$ = observable$.pipe(share());
return merge(
src$.pipe(ignoreElements()),
src$.pipe(/* tap(console.log), */ buffer(throttleCalls$))
);
})
);
};
const clicks$ = interval(50);
clicks$.pipe(throttledBuffer(100), take(4)).subscribe({
next: (value) => {
console.log("p", value);
}
});
/*
console output:
p
(1) [0]
p
(2) [1, 2]
p
(1) [3]
p
(2) [4, 5]
*/
您可以取消注释 tap
以检查发生了什么。
可以找到工作演示 here。
为什么上述解决方案有效以及前一个解决方案出了什么问题
了解 publish
运算符
首先,重要的是要强调 how the publish
operator works:
export function publish<T, R>(selector?: OperatorFunction<T, R>): MonoTypeOperatorFunction<T> | OperatorFunction<T, R> {
return selector ? connect(selector) : multicast(new Subject<T>());
}
在这种情况下,我们对 selector
方法很感兴趣。但是,如您所见,如果您只使用 publish()
,它将使用 multicast
和 Subject
。作为旁注,publishReplay
也使用 multicast
,但使用 ReplaySubject
而不是 Subject
。
connect
函数定义as follows:
// `source` - the further-up observable in the chain
// `subscriber` - the newly created subscriber
/*
To get a better understanding of what `subscriber` and `source` really are, let's consider this quick example
const o$ = new Observable(subscriber => {});
o$.pipe(
map(...),
).subscribe({ next: v => console.log('hello', v) })
In the `map`'s implementation, when the `map` observable is subscribed, it will call again `source.subscribe(someSubscriber)`.
In that case, `someSubscriber` is a `{ next: v => console.log('hello', v) }`(roughly) and `source` is `o$`.
So, in `subscriber => {}` from `Observable`'s callback, the `subscriber` will actually be `someSubscriber`.
*/
// It is a `Subject` instance
const subject = connector();
from(selector(fromSubscribable(subject))).subscribe(subscriber);
subscriber.add(source.subscribe(subject));
fromSubscribable(subject)
将 return 一个 Observable
实例,当订阅该实例时,订阅者将被添加到 subject
的订阅者列表中。因此,您提供给 publish
的回调函数的参数将是我之前提到的 Observable
实例。所有这一切的要点是要知道在该回调 中创建的每个 订阅者都将成为 subject
订阅者列表的一部分。因此,对于 subscriber.add(source.subscribe(subject))
,subject
将订阅 source
,这意味着来自 source
的所有值都将由 subject
接收使用,这也意味着这些值将发送给该列表中的所有订阅者。
因此,要点是您在该回调函数中创建订阅者的 order 很重要。
为什么会出现问题
现在让我们先看看问题出在哪里:
const throttledBuffer = (throttleTimeMs) => {
return pipe(
publish((observable$) => {
const throttleCalls$ = observable$.pipe(
throttleTime(throttleTimeMs, asyncScheduler, {
leading: true,
trailing: true
})
);
return observable$.pipe(buffer(throttleCalls$));
})
);
};
const clicks$ = interval(50);
clicks$.pipe(throttledBuffer(100), take(4)).subscribe({
next: (value) => {
console.log("p", value);
}
});
/*
console output:
p
[]
p
(3) [0, 1, 2]
p
[]
p
(3) [3, 4, 5]
*/
通过 returning observable$.pipe(buffer(throttleCalls$))
,本质上与
observable$.pipe(buffer(throttleCalls$)).subscribe(subscriber /* #1 */);
问题终于来了:)。问题是throttleCalls$
会先被订阅,因为这背后的概念可以归结为f(g(x))
,其中g(x)
会先被调用。因为 throttleCalls$
先订阅,这意味着 subject
实例会将此订阅者注册为列表中的 第一个 。结果,subscriber#1
将是第二个。这意味着当源发出一些东西时,从throttleCalls$
'订阅创建的订阅者将是第一个接收值的订阅者,这就是为什么 你会先得到一个空数组。
请记住,Subject
将值同步发送给它的订阅者。这意味着在缓冲区发出 []
之后,第二个订阅者将立即收到该值并且 buffer
的数组将为 [0]
。由于一秒还未过去,1
和 2
将到达,当第二秒最终过去时,buffer
的数组将为 [0, 1, 2]
。然后,再次发出 3
,但是接收它的 第一个订阅者 将属于 throttleCalls$
,即 buffer
的通知程序,因此,我们再次得到一个空数组。
所以,问题是订阅者添加到 Subject
的订阅者列表的顺序不是我们想要的。
为什么该解决方案有效
让我们再次看看 TLDR 部分的解决方案:
const throttledBuffer = (throttleTimeMs) => {
return pipe(
publish((observable$) => {
const throttleCalls$ = observable$.pipe(
// tap((v) => console.log("in throttle", v)),
throttleTime(throttleTimeMs, asyncScheduler, {
leading: true,
trailing: true
}),
// tap(() => console.log("after throttle"))
);
const src$ = observable$.pipe(share());
return merge(
src$.pipe(ignoreElements()),
src$.pipe(/* tap(console.log), */ buffer(throttleCalls$))
);
})
);
};
const clicks$ = interval(50);
clicks$.pipe(throttledBuffer(100), take(4)).subscribe({
next: (value) => {
console.log("p", value);
}
});
/*
console output:
p
(1) [0]
p
(2) [1, 2]
p
(1) [3]
p
(2) [4, 5]
*/
有了上一节的知识点,我们一步一步来了解是怎么回事。我们知道问题出在订单上,所以我们可以解决这个问题。我们可以通过使用 merge
来获得我们想要的顺序:
merge(
// `ignoreElements()` will ignore `next`, but will let `error`/`complete` notification in
// By providing this as the first argument, the we can be sure that the first emitted value
// will be **first** part of the buffer, and then the `buffer`'s notifier will emit
src$.pipe(ignoreElements()),
// Ensuring the `buffer`'s notifier is the **second** in the list
src$.pipe(/* tap(console.log), */ buffer(throttleCalls$))
);
但是,为了确保一切按预期工作,还有一件事要做,那就是
const src$ = observable$.pipe(share());
share()
所做的是在数据消费者(2 个 merge
参数)和数据生产者(observable$
)之间添加一个 Subject
实例。
我们需要这个的原因最好借助这张小图来解释:
s1 (the `observable$` argument)
/ \
s2 throttleCalls$
/ \
(1)ignoreElements (2)buffer(throttleCalls$)
s2 - the Subject we got from `share()`
所以,当s1
收到一个值时,s2
会先收到,然后是(1)
,然后是(2)
,最后是throttleCalls$
。这就是我们想要的,因为我们首先添加到缓冲区,然后 buffer
的通知程序(即 throttleCalls$
)发出。
现在您可能想知道,为什么它不能像这样工作?
// const src$ = observable$.pipe(share());
return merge(
observable$.pipe(ignoreElements()),
observable$.pipe(/* tap(console.log), */ buffer(throttleCalls$))
);
上述片段的图表如下所示:
s1 (the `observable$` argument)
/ \
(1)ignoreElements (2)buffer(throttleCalls$)
好吧,我们又回到了最初遇到的同样问题,throttleCalls$
先订阅,因为 (1)
和 (2)
完全独立.订阅者列表(大致)如下所示:
[
ignoreElements,
throttleCalls$,
(2) - the `buffer`'s source
]