Window 和 SelectMany 中的直观解释

Intuitive explanation in Window and SelectMany

我试图理解以下结合了 windowSelectMany 的示例。据我所知,一旦 Observable 开始发送项目,Window 就会创建一个 IObservable 流 (IObservable<IObservable<T>>)。此流用 SelectMany 展平以获得 <IObservable<T>

在这个 video tutorial 中,当在一个简单的 IObservable 中使用 SelectMany 时,会创建一个新流(比如说 "thread"),其中一个函数 f 已应用。

问题:

  1. 能否请您解释一下数据是如何到达订阅的以及视频教程中的 marble diagram 是什么?
  2. 用于展平流的函数到底是什么?
  3. 最后,是否可以提供一个更复杂的示例,其中 SelectMany 应用一个简单的函数,如 Range,以及如何在弹珠图中描述结果?

谢谢。

var source = Observable.Interval(TimeSpan.FromMilliseconds(100))// start a counter from zero and last 100 milliseconds
        .Take(5) // take only the first 5 items  
        .Window(2)
        .SelectMany(c => c);

source.Subscribe(
    value => Console.WriteLine(value.ToString()),
    error => Console.WriteLine(error.Message),
    () => Console.WriteLine("Completed")
);

Console.WriteLine("Press any key to exit");
Console.ReadKey();

以上代码每一步弹珠图如下:

interval: 0----1----2----3----4----5----6----7----
take5   : 0----1----2----3----4|
window  : W1--------W2--------W3|
 window1: 0----1|
 window2:           2----3|
 window3:                     4|
slctmny : 0----1----2----3----4|

SelectMany 采用类型 TSource 的可观察对象,以及将单个 TSource 对象转换为 IObservable<TResult> 对象的选择器函数。它将选择器函数应用于所有输入,生成 IObservable<IObservable<TResult>>,然后展平(合并)为一个 IObservable<TResult> 对象。 .SelectMany(selector) 在 Rx 中几乎等同于 .Select(selector).Merge()。因此,如果您有一个 SelectMany(o => o) 调用,就像您的情况一样,这与 Merge 调用相同。

在我们的例子中,Window 是 returning IObservable<IObservable<long>>,所以 TSource 是类型 IObservable<long>。选择器函数returns IObservable<long>,所以TResultlong.

这是一个使用 Range 的示例:选择器将 return 看起来像 {0}, {0, 1}, {0, 1, 2}, {0, 1, 2, 3} 等,但 SelectMany 将其展平,因此您得到一个可观察值:{0, 0, 1, 0, 1, 2, 0, 1, 2, 3...}

var source = Observable.Interval(TimeSpan.FromMilliseconds(100))// start a counter from zero and last 100 milliseconds
        .Take(5) // take only the first 5 items  
        .SelectMany(l => Observable.Range(0, (int)l));