Reactive Extensions:是否可以订阅 Buffer 操作结果的总和?
Reactive Extensions: Is it possible to subscribe on Sum of the result of Buffer operation?
我正在尝试获取有关聚合函数(即 Sum)结果的通知,该函数对无限序列的部分序列进行操作(最顶层,数据源序列永远不会完成)。
问题可以看这里:
var seq = Observable.Interval(TimeSpan.FromMilliseconds(20)).Buffer(10);
seq.Sum(l => l.Sum())
.Subscribe(n =>
s_log.DebugFormat("Got {0}", n));
Lambda l.Sum() 按预期调用(计算了部分总和),但从未打印 "Got ..." 行,因为从未调用订阅者。我怀疑它与原始序列的 "never ending" 字符有某种关系。
有限序列:
Observable.Range(1,100).Buffer(10);
按预期工作。
所以问题很简单:如何将无限序列的部分片段 "mark" 设为 "complete",以便聚合函数单独处理它们(并将它们的结果推送给订阅者)?
Scan是你的朋友:
seq.Scan(0L, (l1, l2) => l1 + l2.Sum())
.Subscribe(n => Console.WriteLine("Got {0}", n));
我正在尝试获取有关聚合函数(即 Sum)结果的通知,该函数对无限序列的部分序列进行操作(最顶层,数据源序列永远不会完成)。 问题可以看这里:
var seq = Observable.Interval(TimeSpan.FromMilliseconds(20)).Buffer(10);
seq.Sum(l => l.Sum())
.Subscribe(n =>
s_log.DebugFormat("Got {0}", n));
Lambda l.Sum() 按预期调用(计算了部分总和),但从未打印 "Got ..." 行,因为从未调用订阅者。我怀疑它与原始序列的 "never ending" 字符有某种关系。 有限序列:
Observable.Range(1,100).Buffer(10);
按预期工作。 所以问题很简单:如何将无限序列的部分片段 "mark" 设为 "complete",以便聚合函数单独处理它们(并将它们的结果推送给订阅者)?
Scan是你的朋友:
seq.Scan(0L, (l1, l2) => l1 + l2.Sum())
.Subscribe(n => Console.WriteLine("Got {0}", n));