我如何缓冲一个主题,使其发出有点像去抖动的主题?
How do I buffer a subject such that it emits somewhat like a debounced subject?
我尝试使用将 TimeSpan
作为输入的常规缓冲区运算符,但这有一些问题。如果满足这些条件,我只希望发出缓冲区:
- 至少收到一个值
- 自收到最后一个值以来已经过了设定的时间,但没有收到任何更多值
本质上,我希望缓冲的主题表现得像这样:
- 如果缓冲区为空时收到下一个值,则将值添加到缓冲区并开始超时
- 如果缓冲区不为空时接收到下一个值,则将值添加到缓冲区并且re-start超时
- 仅在超时开始和结束后,发出当前缓冲区并启动新缓冲区
与去抖动主题一样,它只会在收到值时发出,但与去抖动主题不同,它将所有值收集到缓冲区中而不是丢弃它们。
我想我需要一个调度程序来完成这个,我想我可以 copy/reused 去抖操作员的调度程序,但我找不到它。
编辑:抱歉标题gore/grammar错误
我想我知道该怎么做了。也许它会对其他人有所帮助。
MySubject
.Buffer(() => MySubject.Throttle(TimeSpan.FromSeconds(1))
.Subscribe(() => ... )
您的回答的唯一缺点是您正在为 MySubject
创建两个单独的订阅。如果它动态地产生可能导致悲伤的值。您应该始终使用共享订阅。
方法如下:
MySubject.Publish(ps => ps.Buffer(() => ps.Throttle(TimeSpan.FromSeconds(1.0))));
要查看使用 Publish
的区别,请尝试以下代码:
var rnd = new Random();
var source =
Observable
.Generate(
0, x => x < 100, x => x + 1, x => rnd.Next(10),
x => TimeSpan.FromSeconds(rnd.Next(4)));
这会生成一系列介于 0
和 9
之间的随机数,连续值之间的间隔介于 0
和 3
秒之间。
然后,运行 这个查询:
var query =
source
.Publish(ps =>
ps
.Do(p => Console.WriteLine($"ps1: {p}"))
.Buffer(() =>
ps
.Do(p => Console.WriteLine($"ps2: {p}"))
.Throttle(TimeSpan.FromSeconds(1.0))));
输出如下:
ps1: 1
ps2: 1
ps1: 2
ps2: 2
ps1: 3
ps2: 3
ps1: 3
ps2: 3
ps1: 6
ps1: 1
ps2: 1
ps1: 3
ps2: 3
ps1: 9
ps1: 5
ps2: 5
ps1: 7
ps2: 7
ps1: 9
ps2: 9
ps1: 1
ps2: 1
ps1: 8
ps2: 8
ps1: 5
ps2: 5
ps1: 6
ps2: 6
...
请注意,ps1
和 ps2
数字是成对的且按顺序排列 - 它始终是 ps1
,然后是 ps2
,然后是另一个 ps1
。
实际可观察的输出是这样的:
{ 1, 2, 3 }
{ 3 }
{ 6, 1 }
{ 3 }
{ 9, 5, 7, 9, 1, 8 }
{ 5 }
{ 6 }
现在试试不用 Publish
:
var query =
source
.Do(p => Console.WriteLine($"ps1: {p}"))
.Buffer(() =>
source
.Do(p => Console.WriteLine($"ps2: {p}"))
.Throttle(TimeSpan.FromSeconds(1.0)));
这是输出:
ps1: 0
ps1: 5
ps2: 3
ps1: 6
ps1: 8
ps2: 9
ps1: 7
ps2: 2
ps2: 3
ps2: 4
ps1: 1
ps2: 7
ps2: 4
ps2: 2
ps1: 0
ps1: 6
ps1: 7
ps1: 9
ps1: 2
ps2: 8
ps1: 0
ps2: 3
ps2: 3
ps1: 1
ps2: 9
ps1: 4
ps2: 8
ps2: 3
ps1: 1
...
现在不再配对也不再按顺序排列。
observable 的输出是这样的:
{ 0, 5, 6, 8 }
{ }
{ 7, 1 }
{ }
{ 0, 6 }
{ 7, 9, 2, 0 }
{ }
{ 1, 4 }
{ }
...
注意空缓冲区!
没有的结果一点也不可靠。
始终使用 Publish
以确保您共享相同的可观察源。
我尝试使用将 TimeSpan
作为输入的常规缓冲区运算符,但这有一些问题。如果满足这些条件,我只希望发出缓冲区:
- 至少收到一个值
- 自收到最后一个值以来已经过了设定的时间,但没有收到任何更多值
本质上,我希望缓冲的主题表现得像这样:
- 如果缓冲区为空时收到下一个值,则将值添加到缓冲区并开始超时
- 如果缓冲区不为空时接收到下一个值,则将值添加到缓冲区并且re-start超时
- 仅在超时开始和结束后,发出当前缓冲区并启动新缓冲区
与去抖动主题一样,它只会在收到值时发出,但与去抖动主题不同,它将所有值收集到缓冲区中而不是丢弃它们。
我想我需要一个调度程序来完成这个,我想我可以 copy/reused 去抖操作员的调度程序,但我找不到它。
编辑:抱歉标题gore/grammar错误
我想我知道该怎么做了。也许它会对其他人有所帮助。
MySubject
.Buffer(() => MySubject.Throttle(TimeSpan.FromSeconds(1))
.Subscribe(() => ... )
您的回答的唯一缺点是您正在为 MySubject
创建两个单独的订阅。如果它动态地产生可能导致悲伤的值。您应该始终使用共享订阅。
方法如下:
MySubject.Publish(ps => ps.Buffer(() => ps.Throttle(TimeSpan.FromSeconds(1.0))));
要查看使用 Publish
的区别,请尝试以下代码:
var rnd = new Random();
var source =
Observable
.Generate(
0, x => x < 100, x => x + 1, x => rnd.Next(10),
x => TimeSpan.FromSeconds(rnd.Next(4)));
这会生成一系列介于 0
和 9
之间的随机数,连续值之间的间隔介于 0
和 3
秒之间。
然后,运行 这个查询:
var query =
source
.Publish(ps =>
ps
.Do(p => Console.WriteLine($"ps1: {p}"))
.Buffer(() =>
ps
.Do(p => Console.WriteLine($"ps2: {p}"))
.Throttle(TimeSpan.FromSeconds(1.0))));
输出如下:
ps1: 1
ps2: 1
ps1: 2
ps2: 2
ps1: 3
ps2: 3
ps1: 3
ps2: 3
ps1: 6
ps1: 1
ps2: 1
ps1: 3
ps2: 3
ps1: 9
ps1: 5
ps2: 5
ps1: 7
ps2: 7
ps1: 9
ps2: 9
ps1: 1
ps2: 1
ps1: 8
ps2: 8
ps1: 5
ps2: 5
ps1: 6
ps2: 6
...
请注意,ps1
和 ps2
数字是成对的且按顺序排列 - 它始终是 ps1
,然后是 ps2
,然后是另一个 ps1
。
实际可观察的输出是这样的:
{ 1, 2, 3 }
{ 3 }
{ 6, 1 }
{ 3 }
{ 9, 5, 7, 9, 1, 8 }
{ 5 }
{ 6 }
现在试试不用 Publish
:
var query =
source
.Do(p => Console.WriteLine($"ps1: {p}"))
.Buffer(() =>
source
.Do(p => Console.WriteLine($"ps2: {p}"))
.Throttle(TimeSpan.FromSeconds(1.0)));
这是输出:
ps1: 0
ps1: 5
ps2: 3
ps1: 6
ps1: 8
ps2: 9
ps1: 7
ps2: 2
ps2: 3
ps2: 4
ps1: 1
ps2: 7
ps2: 4
ps2: 2
ps1: 0
ps1: 6
ps1: 7
ps1: 9
ps1: 2
ps2: 8
ps1: 0
ps2: 3
ps2: 3
ps1: 1
ps2: 9
ps1: 4
ps2: 8
ps2: 3
ps1: 1
...
现在不再配对也不再按顺序排列。
observable 的输出是这样的:
{ 0, 5, 6, 8 }
{ }
{ 7, 1 }
{ }
{ 0, 6 }
{ 7, 9, 2, 0 }
{ }
{ 1, 4 }
{ }
...
注意空缓冲区!
没有的结果一点也不可靠。
始终使用 Publish
以确保您共享相同的可观察源。