"Buffer until quiet" 来自 Reactive 的行为?
"Buffer until quiet" behavior from Reactive?
我的问题有点像创建 Nagle algorithm 来解决的问题,但不完全是。我想要的是将来自 IObservable<T>
的 OnNext
通知缓冲到一系列 IObservable<IList<T>>
中,如下所示:
- 当第一个
T
通知到达时,将其添加到缓冲区并开始倒计时
- 如果另一个
T
通知在倒计时结束前到达,将其添加到缓冲区并重新开始倒计时
- 一旦倒计时结束(即生产者已经沉默了一段时间),将所有缓冲的
T
通知作为单个聚合 IList<T>
通知转发。
- 如果在倒计时到期之前缓冲区大小超过了某个最大值,无论如何都要发送它。
IObservable<IList<T>> Buffer(this IObservable<T>, Timespan, int, IScheduler)
看起来很有希望,但它似乎会定期发送聚合通知,而不是执行我想要的 "start the timer when the first notification arrives and restart it when additional ones arrive" 行为,并且最后还会发送一个空列表每次 window 如果下面没有产生通知。
我不想删除任何T
通知;只是缓冲它们。
是否存在这样的东西,或者我需要自己写吗?
SO 上存在一些类似的问题,但不完全是这样。
这是一个可以解决问题的扩展方法。
public static IObservable<IList<TSource>> BufferWithThrottle<TSource>
(this IObservable<TSource> source,
int maxAmount, TimeSpan threshold)
{
return Observable.Create<IList<TSource>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge( g.Buffer(maxAmount).Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}
有趣的运算符。 Supertopi 的回答很好,但可以进行改进。如果 maxAmount
很大,and/or 通知率很高,那么使用 Buffer
将通过分配很快就会被丢弃的缓冲区来消耗 GC。
为了在达到 maxAmount
后关闭每个 GroupBy
Observable,您不需要捕获所有这些元素的 Buffer
只是为了知道它何时已满.根据 Supertopi 的回答,您可以将其稍微更改为以下内容。它不是收集 Buffer
个 maxAmount
个元素,而是在它看到流中的 maxAmount
个元素后才发出信号。
public static IObservable<IList<TSource>> BufferWithThrottle<TSource>(this IObservable<TSource> source, int maxAmount, TimeSpan threshold)
{
return Observable.Create<IList<TSource>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge(g.Take(maxAmount)
.LastAsync()
.Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}
很好的解决方案。在我看来,使用现有运算符创建行为只是为了方便而不是为了性能。
此外,我们应该始终 return IEnumerable 而不是 IList。
返回最少的派生类型 (IEnumerable) 将为您留出最大的余地来更改底层实现。
这是我实现自定义运算符的版本。
public static IObservable<IEnumerable<TValue>> BufferWithThrottle<TValue>(this IObservable<TValue> @this, int maxAmount, TimeSpan threshold)
{
var buffer = new List<TValue>();
return Observable.Create<IEnumerable<TValue>>(observer =>
{
var aTimer = new Timer();
void Clear()
{
aTimer.Stop();
buffer.Clear();
}
void OnNext()
{
observer.OnNext(buffer);
Clear();
}
aTimer.Interval = threshold.TotalMilliseconds;
aTimer.Enabled = true;
aTimer.Elapsed += (sender, args) => OnNext();
var subscription = @this.Subscribe(value =>
{
buffer.Add(value);
if (buffer.Count >= maxAmount)
OnNext();
else
{
aTimer.Stop();
aTimer.Start();
}
});
return Disposable.Create(() =>
{
Clear();
subscription.Dispose();
});
});
}
通过测试与其他解决方案相比的性能,它可以节省高达 30% CPU 的电量并解决内存问题。
我的问题有点像创建 Nagle algorithm 来解决的问题,但不完全是。我想要的是将来自 IObservable<T>
的 OnNext
通知缓冲到一系列 IObservable<IList<T>>
中,如下所示:
- 当第一个
T
通知到达时,将其添加到缓冲区并开始倒计时 - 如果另一个
T
通知在倒计时结束前到达,将其添加到缓冲区并重新开始倒计时 - 一旦倒计时结束(即生产者已经沉默了一段时间),将所有缓冲的
T
通知作为单个聚合IList<T>
通知转发。 - 如果在倒计时到期之前缓冲区大小超过了某个最大值,无论如何都要发送它。
IObservable<IList<T>> Buffer(this IObservable<T>, Timespan, int, IScheduler)
看起来很有希望,但它似乎会定期发送聚合通知,而不是执行我想要的 "start the timer when the first notification arrives and restart it when additional ones arrive" 行为,并且最后还会发送一个空列表每次 window 如果下面没有产生通知。
我不想删除任何T
通知;只是缓冲它们。
是否存在这样的东西,或者我需要自己写吗?
SO 上存在一些类似的问题,但不完全是这样。 这是一个可以解决问题的扩展方法。
public static IObservable<IList<TSource>> BufferWithThrottle<TSource>
(this IObservable<TSource> source,
int maxAmount, TimeSpan threshold)
{
return Observable.Create<IList<TSource>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge( g.Buffer(maxAmount).Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}
有趣的运算符。 Supertopi 的回答很好,但可以进行改进。如果 maxAmount
很大,and/or 通知率很高,那么使用 Buffer
将通过分配很快就会被丢弃的缓冲区来消耗 GC。
为了在达到 maxAmount
后关闭每个 GroupBy
Observable,您不需要捕获所有这些元素的 Buffer
只是为了知道它何时已满.根据 Supertopi 的回答,您可以将其稍微更改为以下内容。它不是收集 Buffer
个 maxAmount
个元素,而是在它看到流中的 maxAmount
个元素后才发出信号。
public static IObservable<IList<TSource>> BufferWithThrottle<TSource>(this IObservable<TSource> source, int maxAmount, TimeSpan threshold)
{
return Observable.Create<IList<TSource>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge(g.Take(maxAmount)
.LastAsync()
.Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}
很好的解决方案。在我看来,使用现有运算符创建行为只是为了方便而不是为了性能。
此外,我们应该始终 return IEnumerable 而不是 IList。 返回最少的派生类型 (IEnumerable) 将为您留出最大的余地来更改底层实现。
这是我实现自定义运算符的版本。
public static IObservable<IEnumerable<TValue>> BufferWithThrottle<TValue>(this IObservable<TValue> @this, int maxAmount, TimeSpan threshold)
{
var buffer = new List<TValue>();
return Observable.Create<IEnumerable<TValue>>(observer =>
{
var aTimer = new Timer();
void Clear()
{
aTimer.Stop();
buffer.Clear();
}
void OnNext()
{
observer.OnNext(buffer);
Clear();
}
aTimer.Interval = threshold.TotalMilliseconds;
aTimer.Enabled = true;
aTimer.Elapsed += (sender, args) => OnNext();
var subscription = @this.Subscribe(value =>
{
buffer.Add(value);
if (buffer.Count >= maxAmount)
OnNext();
else
{
aTimer.Stop();
aTimer.Start();
}
});
return Disposable.Create(() =>
{
Clear();
subscription.Dispose();
});
});
}
通过测试与其他解决方案相比的性能,它可以节省高达 30% CPU 的电量并解决内存问题。