我如何缓冲一个主题,使其发出有点像去抖动的主题?

How do I buffer a subject such that it emits somewhat like a debounced subject?

我尝试使用将 TimeSpan 作为输入的常规缓冲区运算符,但这有一些问题。如果满足这些条件,我只希望发出缓冲区:

  1. 至少收到一个值
  2. 自收到最后一个值以来已经过了设定的时间,但没有收到任何更多值

本质上,我希望缓冲的主题表现得像这样:

  1. 如果缓冲区为空时收到下一个值,则将值添加到缓冲区并开始超时
  2. 如果缓冲区不为空时接收到下一个值,则将值添加到缓冲区并且re-start超时
  3. 仅在超时开始和结束后,发出当前缓冲区并启动新缓冲区

与去抖动主题一样,它只会在收到值时发出,但与去抖动主题不同,它将所有值收集到缓冲区中而不是丢弃它们。

我想我需要一个调度程序来完成这个,我想我可以 copy/reused 去抖操作员的调度程序,但我找不到它。

编辑:抱歉标题gore/grammar错误

我想我知道该怎么做了。也许它会对其他人有所帮助。

MySubject
    .Buffer(() => MySubject.Throttle(TimeSpan.FromSeconds(1))
    .Subscribe(() => ... )

您的回答的唯一缺点是您正在为 MySubject 创建两个单独的订阅。如果它动态地产生可能导致悲伤的值。您应该始终使用共享订阅。

方法如下:

MySubject.Publish(ps => ps.Buffer(() => ps.Throttle(TimeSpan.FromSeconds(1.0))));

要查看使用 Publish 的区别,请尝试以下代码:

var rnd = new Random();

var source =
    Observable
        .Generate(
            0, x => x < 100, x => x + 1, x => rnd.Next(10),
            x => TimeSpan.FromSeconds(rnd.Next(4)));

这会生成一系列介于 09 之间的随机数,连续值之间的间隔介于 03 秒之间。

然后,运行 这个查询:

var query =
    source
        .Publish(ps =>
            ps
                .Do(p => Console.WriteLine($"ps1: {p}"))
                .Buffer(() =>
                    ps
                        .Do(p => Console.WriteLine($"ps2: {p}"))
                        .Throttle(TimeSpan.FromSeconds(1.0))));

输出如下:

ps1: 1
ps2: 1
ps1: 2
ps2: 2
ps1: 3
ps2: 3
ps1: 3
ps2: 3
ps1: 6
ps1: 1
ps2: 1
ps1: 3
ps2: 3
ps1: 9
ps1: 5
ps2: 5
ps1: 7
ps2: 7
ps1: 9
ps2: 9
ps1: 1
ps2: 1
ps1: 8
ps2: 8
ps1: 5
ps2: 5
ps1: 6
ps2: 6
...

请注意,ps1ps2 数字是成对的且按顺序排列 - 它始终是 ps1,然后是 ps2,然后是另一个 ps1

实际可观察​​的输出是这样的:

{ 1, 2, 3 }
{ 3 }
{ 6, 1 }
{ 3 }
{ 9, 5, 7, 9, 1, 8 }
{ 5 }
{ 6 }

现在试试不用 Publish:

var query =
    source
        .Do(p => Console.WriteLine($"ps1: {p}"))
        .Buffer(() =>
            source
                .Do(p => Console.WriteLine($"ps2: {p}"))
                .Throttle(TimeSpan.FromSeconds(1.0)));

这是输出:

ps1: 0
ps1: 5
ps2: 3
ps1: 6
ps1: 8
ps2: 9
ps1: 7
ps2: 2
ps2: 3
ps2: 4
ps1: 1
ps2: 7
ps2: 4
ps2: 2
ps1: 0
ps1: 6
ps1: 7
ps1: 9
ps1: 2
ps2: 8
ps1: 0
ps2: 3
ps2: 3
ps1: 1
ps2: 9
ps1: 4
ps2: 8
ps2: 3
ps1: 1
...

现在不再配对也不再按顺序排列。

observable 的输出是这样的:

{ 0, 5, 6, 8 }
{ }
{ 7, 1 }
{ }
{ 0, 6 }
{ 7, 9, 2, 0 }
{ }
{ 1, 4 }
{ }  
...

注意空缓冲区!

没有的结果一点也不可靠。

始终使用 Publish 以确保您共享相同的可观察源。