Observable.Window 和 .Zip 无法正常运行
Observable.Window and .Zip not functioning like I would expect
我正在尝试将 IEnumerable
转换为 IObservable
,以一秒的间隔传送其项目。
var spartans = Enumerable.Range(0, 300).ToObservable();
spartans
.Window(30)
.Zip(Observable.Timer(DateTimeOffset.Now, TimeSpan.FromMilliseconds(1000)), (x, _) => x)
.SelectMany(w => w)
.Subscribe(
n => Console.WriteLine("{0}", n),
() => Console.WriteLine("all end"));
使用此代码,十秒后唯一打印的是 "all end"。如果我删除 .Zip
那么整个序列会立即打印,如果我删除 .Window
和 .SelectMany
那么整个序列每秒打印一个项目。如果我查看传递给 SelectMany
的 lambda 中的 "windowed" observable,我可以看到它是空的。我的问题是,为什么?
我不确定如何使它与 Window 一起工作,但是这个怎么样:
var spartans = Enumerable.Range(0, 300).ToObservable();
spartans
.Select(x => Observable.Timer(TimeSpan.FromSeconds(1)).Select(_ => x))
.Merge(30);
问题的发生是因为 Window
如何处理计数 - 而这个不是特别直观!
如您所知,Window
提供流的流。然而,通过计数,child 流是“温暖的”——即当这个流的观察者在它的 OnNext
处理程序中收到一个新的 window 时,它必须在它放弃之前订阅它将控制权交还给可观察对象,否则事件将丢失。
Zip
不会“知道”它正在处理这种情况,并且不会让您有机会在它抓住下一个之前订阅每个 child window。
如果删除 Zip
,您会看到所有事件,因为 SelectMany
订阅了所有 child windows 因为它收到它们。
最简单的解决方法是使用 Buffer
而不是 Window
- 进行一项更改,您的代码就可以工作了。这是因为 Buffer
的工作方式与 SelectMany
非常相似,通过这样做有效地保留了 windows:
Window(30).SelectMany(x => x.ToList())
元素不再温暖 windows,而是结晶为列表,您的 Zip
现在将按预期工作,以下 SelectMany
将列表展平。
重要的性能考虑因素
需要注意的是,这种方法会导致整个IEnumerable<T>
一次性完成运行。如果应延迟评估源可枚举(这通常是可取的),则您需要采用不同的方式。使用下游 Observable 来控制上游 Observable 的步伐是一个棘手的问题。
让我们用辅助方法替换您的枚举,这样我们就可以看到每批 30 的评估时间:
static IEnumerable<int> Spartans()
{
for(int i = 0; i < 300; i++)
{
if(i % 30 == 0)
Console.WriteLine("30 More!");
yield return i;
}
}
并像这样使用它(这里使用 Buffer
“修复”,但行为与 Window
类似):
Spartans().ToObservable()
.Buffer(30)
.Zip(Observable.Timer(DateTimeOffset.Now,
TimeSpan.FromMilliseconds(1000)),
(x, _) => x)
.SelectMany(w => w)
.Subscribe(
n => Console.WriteLine("{0}", n),
() => Console.WriteLine("all end"));
然后您会看到这样的输出,它演示了源可枚举是如何一次性耗尽的:
30 More!
0
1
...miss a few...
29
30 More!
30 More!
30 More!
30 More!
30 More!
30 More!
30 More!
30 More!
30 More!
30
31
32
...etc...
要真正调整源的节奏,而不是直接使用 ToObservable()
,您可以执行以下操作。请注意 Spartans()
IEnumerable<T>
上的 Buffer
操作来自 nuget 包 Ix-Main
- 由 Rx 团队添加以填补 IEnumerable<T>
monad 上的一些漏洞:
var spartans = Spartans().Buffer(30);
var pace = Observable.Timer(DateTimeOffset.Now, TimeSpan.FromMilliseconds(1000));
pace.Zip(spartans, (_,x) => x)
.SelectMany(x => x)
.Subscribe(
n => Console.WriteLine("{0}", n),
() => Console.WriteLine("all end"));
并且输出可能成为更理想的惰性评估输出:
30 More!
0
1
2
...miss a few...
29
30 More!
30
31
32
...miss a few...
59
30 More!
60
61
62
...etc
我正在尝试将 IEnumerable
转换为 IObservable
,以一秒的间隔传送其项目。
var spartans = Enumerable.Range(0, 300).ToObservable();
spartans
.Window(30)
.Zip(Observable.Timer(DateTimeOffset.Now, TimeSpan.FromMilliseconds(1000)), (x, _) => x)
.SelectMany(w => w)
.Subscribe(
n => Console.WriteLine("{0}", n),
() => Console.WriteLine("all end"));
使用此代码,十秒后唯一打印的是 "all end"。如果我删除 .Zip
那么整个序列会立即打印,如果我删除 .Window
和 .SelectMany
那么整个序列每秒打印一个项目。如果我查看传递给 SelectMany
的 lambda 中的 "windowed" observable,我可以看到它是空的。我的问题是,为什么?
我不确定如何使它与 Window 一起工作,但是这个怎么样:
var spartans = Enumerable.Range(0, 300).ToObservable();
spartans
.Select(x => Observable.Timer(TimeSpan.FromSeconds(1)).Select(_ => x))
.Merge(30);
问题的发生是因为 Window
如何处理计数 - 而这个不是特别直观!
如您所知,Window
提供流的流。然而,通过计数,child 流是“温暖的”——即当这个流的观察者在它的 OnNext
处理程序中收到一个新的 window 时,它必须在它放弃之前订阅它将控制权交还给可观察对象,否则事件将丢失。
Zip
不会“知道”它正在处理这种情况,并且不会让您有机会在它抓住下一个之前订阅每个 child window。
如果删除 Zip
,您会看到所有事件,因为 SelectMany
订阅了所有 child windows 因为它收到它们。
最简单的解决方法是使用 Buffer
而不是 Window
- 进行一项更改,您的代码就可以工作了。这是因为 Buffer
的工作方式与 SelectMany
非常相似,通过这样做有效地保留了 windows:
Window(30).SelectMany(x => x.ToList())
元素不再温暖 windows,而是结晶为列表,您的 Zip
现在将按预期工作,以下 SelectMany
将列表展平。
重要的性能考虑因素
需要注意的是,这种方法会导致整个IEnumerable<T>
一次性完成运行。如果应延迟评估源可枚举(这通常是可取的),则您需要采用不同的方式。使用下游 Observable 来控制上游 Observable 的步伐是一个棘手的问题。
让我们用辅助方法替换您的枚举,这样我们就可以看到每批 30 的评估时间:
static IEnumerable<int> Spartans()
{
for(int i = 0; i < 300; i++)
{
if(i % 30 == 0)
Console.WriteLine("30 More!");
yield return i;
}
}
并像这样使用它(这里使用 Buffer
“修复”,但行为与 Window
类似):
Spartans().ToObservable()
.Buffer(30)
.Zip(Observable.Timer(DateTimeOffset.Now,
TimeSpan.FromMilliseconds(1000)),
(x, _) => x)
.SelectMany(w => w)
.Subscribe(
n => Console.WriteLine("{0}", n),
() => Console.WriteLine("all end"));
然后您会看到这样的输出,它演示了源可枚举是如何一次性耗尽的:
30 More!
0
1
...miss a few...
29
30 More!
30 More!
30 More!
30 More!
30 More!
30 More!
30 More!
30 More!
30 More!
30
31
32
...etc...
要真正调整源的节奏,而不是直接使用 ToObservable()
,您可以执行以下操作。请注意 Spartans()
IEnumerable<T>
上的 Buffer
操作来自 nuget 包 Ix-Main
- 由 Rx 团队添加以填补 IEnumerable<T>
monad 上的一些漏洞:
var spartans = Spartans().Buffer(30);
var pace = Observable.Timer(DateTimeOffset.Now, TimeSpan.FromMilliseconds(1000));
pace.Zip(spartans, (_,x) => x)
.SelectMany(x => x)
.Subscribe(
n => Console.WriteLine("{0}", n),
() => Console.WriteLine("all end"));
并且输出可能成为更理想的惰性评估输出:
30 More!
0
1
2
...miss a few...
29
30 More!
30
31
32
...miss a few...
59
30 More!
60
61
62
...etc