如何在 Rx.NET 中缓冲达到一定总大小的项目?

How to buffer items up to a certain total size in Rx.NET?

鉴于:

  1. Observable<T> src
  2. T 有一个整数 属性 Size
  3. 某些SizeThreshold > 0

需要 Observable<List<T>> 其中:

  1. list.Sum(o => o.Size)>=SizeThreshold
  2. list.Take(list.Length - 1).Sum(o => o.Size) < SizeThreshold

即我需要 Buffer 的变体。

我的问题 - 我是否需要从头开始编写它,或者 Rx.NET 是否有任何现成的东西可以使用?

Buffer 方法具有边界重载。您可以通过扫描从您的初始可观察对象创建边界可观察对象。

IObservable<Foo> src = Observable.Range(1, 200).Select(i => new Foo() {Size = i});
int sizeThreshold = 300;

var bufferBoundaries = src.Scan(0, (seed, val) =>
{
    if (seed >= sizeThreshold)
    {
        return val.Size;
    }
    return seed + val.Size;
}).Where(x => x >= sizeThreshold);

var result = src.Buffer(bufferBoundaries);

和数据class

class Foo
{
    public int Size { get; set; }
}

正在打印结果

result.Subscribe(x =>
{
    Console.WriteLine(String.Format("New buffer. Count {0}. Sum {1}", x.Count, x.Sum(y => y.Size)));
    foreach (var foo in x)
    {
        Console.WriteLine(foo.Size);
    }
});

更新

这个解决方案有一个小问题。缓冲区的结束由 bufferBoundaries 或 src complete 确定。因此,当初始可观察对象的最后一个元素关闭缓冲区时,您将得到一个空列表。

例如:

src = {3, 3}
sizeThreshold = 5;

结果会是

{ [3, 3], [] }

更新 2

此解决方案有效,因为 observables 以某种方式由 rx 同步。 如果我们添加一些异步行为,它就会中断。 例如,如果我们将结果更改为

var result = src.ObserveOn(NewThreadScheduler.Default).Buffer(bufferBoundaries);

或者改变bufferBoundaries

var bufferBoundaries = src.ObserveOn(NewThreadScheduler.Default).Scan...