无法使用 FirstAsync 获得 Rx Window 的第一个 window
Can't get first window of Rx's Window with FirstAsync
我想把一个可观察序列(IObservable
)分成几个可观察序列(条件比较复杂,但为了演示我们可以简单地使用计数)。很明显,这需要 Window
.
private static IObservable<int> GenerateSequence()
{
return Observable.Range(1, 5);
}
await GenerateSequence()
.Window(2)
.Select((w, i) => new {i, w})
.Do(w => w.w.Dump($"Window {w.i}"));
输出符合预期:
Window 0-->1
Window 0-->2
Window 0-->X
Window 1-->3
Window 1-->4
Window 1-->X
Window 2-->5
Window 2-->X
(X
标记 OnCompleted
)
现在,出于某种原因,我只想要这些序列中的第一个。因此,FirstAsync
:
(await GenerateSequence()
.Window(2)
.FirstAsync())
.Dump("Window");
但奇怪的是,我根本没有输出,就好像我从 FirstAsync
得到的序列完全死了。
我对 Rx 有点陌生,所以我完全不知道这里到底发生了什么以及为什么会发生。
编辑:
你的答案有效。您 may want to look 将 Window
替换为 Buffer
。它们实际上是相同的,除了 Buffer 用于类似的场景。它 returns 是一个列表而不是数组:
var t = GenerateSequence()
.Buffer(2)
.FirstAsync()
//.Select(list => list.ToArray()) //If you're particular about Task<int[]> over Task<IList<int>>
.ToTask();
另外,.SelectMany(i => i)
可以用.Merge()
代替。
旧答案:
我一般不会将 Rx 与 await
混用。它显然得到了支持,但似乎……不直观。此代码转储第一个 window:
的输出
GenerateSequence()
.Window(2)
.FirstAsync()
.Subscribe(i => i.Dump("Window"));
这会转储每个 window:
中的第一项
GenerateSequence()
.Window(2)
.Select(o => o.FirstAsync())
.Subscribe(i => i.Dump("Window"));
从你的问题中不清楚你想要哪一个。
实际上,解决方案 非常简单(尽管我仍然不明白我的原始代码中发生了什么)- 使用 SelectMany
来展平序列:
GenerateSequence()
.Window(2)
.FirstAsync()
.Concat()
.Dump("SO");
按预期生成:
SO-->1
SO-->2
SO-->X
然后,为了实现我最初的目标——即将 monad 和 return 结果更改为 Task<int[]>
:
GenerateSequence()
.Window(2)
.FirstAsync()
.Concat()
.ToArray()
.ToTask();
我想把一个可观察序列(IObservable
)分成几个可观察序列(条件比较复杂,但为了演示我们可以简单地使用计数)。很明显,这需要 Window
.
private static IObservable<int> GenerateSequence()
{
return Observable.Range(1, 5);
}
await GenerateSequence()
.Window(2)
.Select((w, i) => new {i, w})
.Do(w => w.w.Dump($"Window {w.i}"));
输出符合预期:
Window 0-->1
Window 0-->2
Window 0-->X
Window 1-->3
Window 1-->4
Window 1-->X
Window 2-->5
Window 2-->X
(X
标记 OnCompleted
)
现在,出于某种原因,我只想要这些序列中的第一个。因此,FirstAsync
:
(await GenerateSequence()
.Window(2)
.FirstAsync())
.Dump("Window");
但奇怪的是,我根本没有输出,就好像我从 FirstAsync
得到的序列完全死了。
我对 Rx 有点陌生,所以我完全不知道这里到底发生了什么以及为什么会发生。
编辑:
你的答案有效。您 may want to look 将 Window
替换为 Buffer
。它们实际上是相同的,除了 Buffer 用于类似的场景。它 returns 是一个列表而不是数组:
var t = GenerateSequence()
.Buffer(2)
.FirstAsync()
//.Select(list => list.ToArray()) //If you're particular about Task<int[]> over Task<IList<int>>
.ToTask();
另外,.SelectMany(i => i)
可以用.Merge()
代替。
旧答案:
我一般不会将 Rx 与 await
混用。它显然得到了支持,但似乎……不直观。此代码转储第一个 window:
GenerateSequence()
.Window(2)
.FirstAsync()
.Subscribe(i => i.Dump("Window"));
这会转储每个 window:
中的第一项GenerateSequence()
.Window(2)
.Select(o => o.FirstAsync())
.Subscribe(i => i.Dump("Window"));
从你的问题中不清楚你想要哪一个。
实际上,解决方案 非常简单(尽管我仍然不明白我的原始代码中发生了什么)- 使用 SelectMany
来展平序列:
GenerateSequence()
.Window(2)
.FirstAsync()
.Concat()
.Dump("SO");
按预期生成:
SO-->1
SO-->2
SO-->X
然后,为了实现我最初的目标——即将 monad 和 return 结果更改为 Task<int[]>
:
GenerateSequence()
.Window(2)
.FirstAsync()
.Concat()
.ToArray()
.ToTask();