Observable.Window 和 .Zip 无法正常运行

Observable.Window and .Zip not functioning like I would expect

我正在尝试将 IEnumerable 转换为 IObservable,以一秒的间隔传送其项目。

var spartans = Enumerable.Range(0, 300).ToObservable();

spartans
    .Window(30)
    .Zip(Observable.Timer(DateTimeOffset.Now, TimeSpan.FromMilliseconds(1000)), (x, _) => x)
    .SelectMany(w => w)
    .Subscribe(
        n => Console.WriteLine("{0}", n),
        () => Console.WriteLine("all end"));

使用此代码,十秒后唯一打印的是 "all end"。如果我删除 .Zip 那么整个序列会立即打印,如果我删除 .Window.SelectMany 那么整个序列每秒打印一个项目。如果我查看传递给 SelectMany 的 lambda 中的 "windowed" observable,我可以看到它是空的。我的问题是,为什么?

我不确定如何使它与 Window 一起工作,但是这个怎么样:

var spartans = Enumerable.Range(0, 300).ToObservable();

spartans
    .Select(x => Observable.Timer(TimeSpan.FromSeconds(1)).Select(_ => x))
    .Merge(30);

问题的发生是因为 Window 如何处理计数 - 而这个不是特别直观!

如您所知,Window 提供流的流。然而,通过计数,child 流是“温暖的”——即当这个流的观察者在它的 OnNext 处理程序中收到一个新的 window 时,它必须在它放弃之前订阅它将控制权交还给可观察对象,否则事件将丢失。

Zip 不会“知道”它正在处理这种情况,并且不会让您有机会在它抓住下一个之前订阅每个 child window。

如果删除 Zip,您会看到所有事件,因为 SelectMany 订阅了所有 child windows 因为它收到它们。

最简单的解决方法是使用 Buffer 而不是 Window - 进行一项更改,您的代码就可以工作了。这是因为 Buffer 的工作方式与 SelectMany 非常相似,通过这样做有效地保留了 windows:

Window(30).SelectMany(x => x.ToList())

元素不再温暖 windows,而是结晶为列表,您的 Zip 现在将按预期工作,以下 SelectMany 将列表展平。

重要的性能考虑因素

需要注意的是,这种方法会导致整个IEnumerable<T>一次性完成运行。如果应延迟评估源可枚举(这通常是可取的),则您需要采用不同的方式。使用下游 Observable 来控制上游 Observable 的步伐是一个棘手的问题。

让我们用辅助方法替换您的枚举,这样我们就可以看到每批 30 的评估时间:

static IEnumerable<int> Spartans()
{
    for(int i = 0; i < 300; i++)
    {
        if(i % 30 == 0)
            Console.WriteLine("30 More!");
        
        yield return i;            
    }
}

并像这样使用它(这里使用 Buffer“修复”,但行为与 Window 类似):

Spartans().ToObservable()
          .Buffer(30)
          .Zip(Observable.Timer(DateTimeOffset.Now, 
                                TimeSpan.FromMilliseconds(1000)),
               (x, _) => x)
          .SelectMany(w => w)
          .Subscribe(
              n => Console.WriteLine("{0}", n),
              () => Console.WriteLine("all end")); 

然后您会看到这样的输出,它演示了源可枚举是如何一次性耗尽的:

30 More!
0
1
...miss a few...
29
30 More!
30 More!
30 More!
30 More!
30 More!
30 More!
30 More!
30 More!
30 More!
30
31
32
...etc...

要真正调整源的节奏,而不是直接使用 ToObservable(),您可以执行以下操作。请注意 Spartans() IEnumerable<T> 上的 Buffer 操作来自 nuget 包 Ix-Main - 由 Rx 团队添加以填补 IEnumerable<T> monad 上的一些漏洞:

var spartans = Spartans().Buffer(30);
var pace = Observable.Timer(DateTimeOffset.Now, TimeSpan.FromMilliseconds(1000));
       
pace.Zip(spartans, (_,x) => x)
    .SelectMany(x => x)
    .Subscribe(
        n => Console.WriteLine("{0}", n),
        () => Console.WriteLine("all end"));  

并且输出可能成为更理想的惰性评估输出:

30 More!
0
1
2
...miss a few...
29
30 More!
30
31
32
...miss a few...
59
30 More!
60
61
62
...etc