如何在 Rx.NET 中缓冲达到一定总大小的项目?
How to buffer items up to a certain total size in Rx.NET?
鉴于:
Observable<T> src
T
有一个整数 属性 Size
- 某些
SizeThreshold > 0
需要 Observable<List<T>>
其中:
list.Sum(o => o.Size)
>=SizeThreshold
list.Take(list.Length - 1).Sum(o => o.Size)
< SizeThreshold
即我需要 Buffer
的变体。
我的问题 - 我是否需要从头开始编写它,或者 Rx.NET 是否有任何现成的东西可以使用?
Buffer
方法具有边界重载。您可以通过扫描从您的初始可观察对象创建边界可观察对象。
IObservable<Foo> src = Observable.Range(1, 200).Select(i => new Foo() {Size = i});
int sizeThreshold = 300;
var bufferBoundaries = src.Scan(0, (seed, val) =>
{
if (seed >= sizeThreshold)
{
return val.Size;
}
return seed + val.Size;
}).Where(x => x >= sizeThreshold);
var result = src.Buffer(bufferBoundaries);
和数据class
class Foo
{
public int Size { get; set; }
}
正在打印结果
result.Subscribe(x =>
{
Console.WriteLine(String.Format("New buffer. Count {0}. Sum {1}", x.Count, x.Sum(y => y.Size)));
foreach (var foo in x)
{
Console.WriteLine(foo.Size);
}
});
更新
这个解决方案有一个小问题。缓冲区的结束由 bufferBoundaries 或 src complete 确定。因此,当初始可观察对象的最后一个元素关闭缓冲区时,您将得到一个空列表。
例如:
src = {3, 3}
sizeThreshold = 5;
结果会是
{ [3, 3], [] }
更新 2
此解决方案有效,因为 observables 以某种方式由 rx 同步。
如果我们添加一些异步行为,它就会中断。
例如,如果我们将结果更改为
var result = src.ObserveOn(NewThreadScheduler.Default).Buffer(bufferBoundaries);
或者改变bufferBoundaries
var bufferBoundaries = src.ObserveOn(NewThreadScheduler.Default).Scan...
鉴于:
Observable<T> src
T
有一个整数 属性Size
- 某些
SizeThreshold > 0
需要 Observable<List<T>>
其中:
list.Sum(o => o.Size)
>=SizeThreshold
list.Take(list.Length - 1).Sum(o => o.Size)
<SizeThreshold
即我需要 Buffer
的变体。
我的问题 - 我是否需要从头开始编写它,或者 Rx.NET 是否有任何现成的东西可以使用?
Buffer
方法具有边界重载。您可以通过扫描从您的初始可观察对象创建边界可观察对象。
IObservable<Foo> src = Observable.Range(1, 200).Select(i => new Foo() {Size = i});
int sizeThreshold = 300;
var bufferBoundaries = src.Scan(0, (seed, val) =>
{
if (seed >= sizeThreshold)
{
return val.Size;
}
return seed + val.Size;
}).Where(x => x >= sizeThreshold);
var result = src.Buffer(bufferBoundaries);
和数据class
class Foo
{
public int Size { get; set; }
}
正在打印结果
result.Subscribe(x =>
{
Console.WriteLine(String.Format("New buffer. Count {0}. Sum {1}", x.Count, x.Sum(y => y.Size)));
foreach (var foo in x)
{
Console.WriteLine(foo.Size);
}
});
更新
这个解决方案有一个小问题。缓冲区的结束由 bufferBoundaries 或 src complete 确定。因此,当初始可观察对象的最后一个元素关闭缓冲区时,您将得到一个空列表。
例如:
src = {3, 3}
sizeThreshold = 5;
结果会是
{ [3, 3], [] }
更新 2
此解决方案有效,因为 observables 以某种方式由 rx 同步。 如果我们添加一些异步行为,它就会中断。 例如,如果我们将结果更改为
var result = src.ObserveOn(NewThreadScheduler.Default).Buffer(bufferBoundaries);
或者改变bufferBoundaries
var bufferBoundaries = src.ObserveOn(NewThreadScheduler.Default).Scan...