从每个时间间隔的可观察值中获取 N 个值

Take N values from observable on each interval

我有一个 Observable,它为每个 ms 传输一个值。 ,这是每 250 毫秒完成一次。 (意味着 250 毫秒内有 250 个值(给予或接受))。

模拟示例代码:

     IObservable<IEnumerable<int>> input = from _ in Observable.Interval(TimeSpan.FromMilliseconds(250))
                    select CreateSamples(250);

      input.Subscribe(values =>
        {
            for (int i = 0; i < values.Count(); i++)
            {
                Console.WriteLine("Value : {0}", i);
            }
        });

        Console.ReadKey(); 


    private static IEnumerable<int> CreateSamples(int count)
    {
        for (int i = 0; i < 250; i++)
        {
            yield return i;
        }
    }

我需要的是创建某种形式的过程可观察对象,它以每 33 毫秒 8 个值的速率处理输入可观察对象

类似的东西:

 IObservable<IEnumerable<int>> process = from _ in Observable.Interval(TimeSpan.FromMilliseconds(33)) 
                     select stream.Take(8);

我想知道两件事:

1) 我如何使用响应式扩展提供的内置运算符编写第一个示例?

2) 我如何创建从输入流中获取值的流程流 这与我描述的行为有关?

我尝试使用 Window 作为以下评论的建议。

 input.Window(TimeSpan.FromMilliseconds(33)).Take(8).Subscribe(winObservable => Debug.WriteLine(" !! "));

好像我得到了 8 个且只有 8 个未知数量的可观察值

我需要的是每 33 毫秒重复出现 8 个值。来自输入可观察。

上面的代码做的是 8 个 IEnumrable 的 observable,然后闲置。

编辑:感谢 James World。这是一个示例。

  var input = Observable.Range(1, int.MaxValue);

  var timedInput = Observable.Interval(TimeSpan.FromMilliseconds(33))
        .Zip(input.Buffer(8), (_, buffer) => buffer);

  timedInput.SelectMany(x => x).Subscribe(Console.WriteLine);

但现在它变得更棘手了,我需要计算缓冲区值 我需要通过 Intervals
之间传递的实际 MS 来完成此操作 当您编写 TimeSpan.FromMilliseconds(33) 时,计时器的间隔事件实际上会在 45 毫秒左右引发或采取 .

有什么方法可以计算缓冲区,比如 PSUDO

  input.TimeInterval().Buffer( s => s.Interval.Milliseconds / 4)

您将无法通过合理的解决方案准确地执行此操作,因为 .NET 计时器分辨率为 15 毫秒。

如果计时器 足够快,您将不得不使用定速器展平并重新打包流,例如:

// flatten stream
var fs = input.SelectMany(x => x);

// buffer 8 values and release every 33 milliseconds
var xs = Observable.Interval(TimeSpan.FromMilliseconds(33))
                   .Zip(fs.Buffer(8), (_,buffer) => buffer);

虽然正如我所说,这会给出非常紧张的时机。如果那种时序分辨率对你很重要,那就原生吧!

我同意詹姆斯的分析。

我想知道这个查询是否给你一个更好的结果:

IObservable<IList<int>> input =
    Observable
        .Generate(
            0,
            x => true,
            x => x < 250 ? x + 1 : 0,
            x => x,
            x => TimeSpan.FromMilliseconds(33.0 / 8.0))
        .Buffer(TimeSpan.FromMilliseconds(33.0));