在没有 onBackpressureLatest 的情况下处理 Rx.NET 中的背压
Handling backpressure in Rx.NET without onBackpressureLatest
我需要在Rx.NET中实现以下算法:
- 从
stream
获取最新项目,或者如果没有新项目,则等待新项目而不阻塞。只有最新的项目很重要,其他的可以放弃。
- 将项目输入
SlowFunction
并打印输出。
- 从第 1 步开始重复。
天真的解决方案是:
let PrintLatestData (stream: IObservable<_>) =
stream.Select(SlowFunction).Subscribe(printfn "%A")
但是,此解决方案不起作用,因为平均而言 stream
发射物品的速度比 SlowFunction
消耗物品的速度快。由于 Select
不会丢弃项目,而是尝试按从旧到新的顺序处理每个项目,因此随着程序 运行,项目被发出和打印之间的延迟将增长到无穷大。应仅从流中获取最新的最近项,以避免这种无限增长的背压。
我搜索了文档并在 RxJava 中找到了一个名为 onBackpressureLatest
的方法,根据我的理解,它可以执行我上面描述的操作。但是,Rx.NET 中不存在该方法。如何在 Rx.NET 中实现?
sync/async 建议可能会略有帮助,但是,鉴于慢速函数总是比事件流慢,使其异步可能允许您并行处理(在线程池上观察)成本(最终)仅 运行 超出线程或通过上下文切换增加更多延迟。这听起来不像是我的解决方案。
我建议您看看 Dave Sexton 编写的开源 Rxx 'Introspective' 运算符。这些可以改变您从中获取最新的 buffer/throttle 周期,因为队列由于消费者缓慢而备份。如果慢函数突然变快,它根本不会缓冲东西。如果它变慢,它会缓冲更多。
您必须检查是否有 'latest from' 类型,或者只修改现有类型以满足您的需要。例如。使用缓冲区并只取缓冲区中的最后一项,或进一步增强以仅在内部存储最新的。 Google 'Rxx',您会在 Github 某处找到它。
一个更简单的方法,如果 'slow function' 的时间是相当可预测的,就是简单地通过超过这个时间的量来限制你的流。显然,我指的不是标准 rx 'throttle',而是允许更新更新而不是旧更新的一种。这里有很多解决这类问题的方法。
我想你想使用类似 ObserveLatestOn
的东西。它有效地用单个值和一个标志替换了传入事件队列。
James World 已在此处发布相关博客 http://www.zerobugbuild.com/?p=192
这个概念在 GUI 应用程序中大量使用,它们无法相信服务器向其推送数据的速度。
您还可以在 Reactive Trader 中看到一个实现 https://github.com/AdaptiveConsulting/ReactiveTrader/blob/83a6b7f312b9ba9d70327f03d8d326934b379211/src/Adaptive.ReactiveTrader.Shared/Extensions/ObservableExtensions.cs#L64
以及解释 ReactiveTrader 的辅助演示 https://leecampbell.com/presentations/#ReactConfLondon2014
要明确这是一个 load-shedding 算法,而不是背压算法。
我前段时间也遇到了同样的问题,但我没有找到一个 built-in 运算符可以完全做到这一点。所以我写了我自己的,我称之为Latest
。实现起来并不简单,但发现它在我当前的项目中非常有用。
它是这样工作的:当观察者忙于处理之前的通知(当然是在它自己的线程上)时,它会将最后的最多 n 个通知(n >= 0)和 OnNext
s 排队观察者一旦空闲。所以:
Latest(0)
: 仅在观察者空闲时观察到达的物品
Latest(1)
: 总是观察最新的
Latest(1000)
(例如):通常处理所有项目,但如果有什么东西被卡在生产线上,宁愿错过一些也不要得到 OutOfMemoryException
Latest(int.MaxValue)
: 不遗漏任何一项,但在生产者和消费者之间进行负载平衡。
因此您的代码将是:stream.Latest(1).Select(SlowFunction).Subscribe(printfn "%A")
签名看起来像这样:
/// <summary>
/// Avoids backpressure by enqueuing items when the <paramref name="source"/> produces them more rapidly than the observer can process.
/// </summary>
/// <param name="source">The source sequence.</param>
/// <param name="maxQueueSize">Maximum queue size. If the queue gets full, less recent items are discarded from the queue.</param>
/// <param name="scheduler">Optional, default: <see cref="Scheduler.Default"/>: <see cref="IScheduler"/> on which to observe notifications.</param>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="maxQueueSize"/> is negative.</exception>
/// <remarks>
/// A <paramref name="maxQueueSize"/> of 0 observes items only if the subscriber is ready.
/// A <paramref name="maxQueueSize"/> of 1 guarantees to observe the last item in the sequence, if any.
/// To observe the whole source sequence, specify <see cref="int.MaxValue"/>.
/// </remarks>
public static IObservable<TSource> Latest<TSource>(this IObservable<TSource> source, int maxQueueSize, IScheduler scheduler = null)
实施太大,无法在此处 post,但如果有人感兴趣,我很乐意分享。让我知道。
您可以 sample
以您知道 SlowFunction
可以处理的时间间隔播放流。这是 java 中的示例:
TestScheduler ts = new TestScheduler();
Observable<Long> stream = Observable.interval(1, TimeUnit.MILLISECONDS, ts).take(500);
stream.sample(100, TimeUnit.MILLISECONDS, ts).subscribe(System.out::println);
ts.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
98
198
298
398
498
499
sample
不会造成背压,总是抓取流中的最新值,满足你的要求。此外 sample
不会发送相同的值两次 (从上面可以看出 499
只打印一次).
我认为这是一个有效的 C#
/F#
解决方案:
static IDisposable PrintLatestData<T>(IObservable<T> stream) {
return stream.Sample(TimeSpan.FromMilliseconds(100))
.Select(SlowFunction)
.Subscribe(Console.WriteLine);
}
let PrintLatestData (stream: IObservable<_>) =
stream.Sample(TimeSpan.FromMilliseconds(100))
.Select(SlowFunction)
.Subscribe(printfn "%A")
我需要在Rx.NET中实现以下算法:
- 从
stream
获取最新项目,或者如果没有新项目,则等待新项目而不阻塞。只有最新的项目很重要,其他的可以放弃。 - 将项目输入
SlowFunction
并打印输出。 - 从第 1 步开始重复。
天真的解决方案是:
let PrintLatestData (stream: IObservable<_>) =
stream.Select(SlowFunction).Subscribe(printfn "%A")
但是,此解决方案不起作用,因为平均而言 stream
发射物品的速度比 SlowFunction
消耗物品的速度快。由于 Select
不会丢弃项目,而是尝试按从旧到新的顺序处理每个项目,因此随着程序 运行,项目被发出和打印之间的延迟将增长到无穷大。应仅从流中获取最新的最近项,以避免这种无限增长的背压。
我搜索了文档并在 RxJava 中找到了一个名为 onBackpressureLatest
的方法,根据我的理解,它可以执行我上面描述的操作。但是,Rx.NET 中不存在该方法。如何在 Rx.NET 中实现?
sync/async 建议可能会略有帮助,但是,鉴于慢速函数总是比事件流慢,使其异步可能允许您并行处理(在线程池上观察)成本(最终)仅 运行 超出线程或通过上下文切换增加更多延迟。这听起来不像是我的解决方案。
我建议您看看 Dave Sexton 编写的开源 Rxx 'Introspective' 运算符。这些可以改变您从中获取最新的 buffer/throttle 周期,因为队列由于消费者缓慢而备份。如果慢函数突然变快,它根本不会缓冲东西。如果它变慢,它会缓冲更多。 您必须检查是否有 'latest from' 类型,或者只修改现有类型以满足您的需要。例如。使用缓冲区并只取缓冲区中的最后一项,或进一步增强以仅在内部存储最新的。 Google 'Rxx',您会在 Github 某处找到它。
一个更简单的方法,如果 'slow function' 的时间是相当可预测的,就是简单地通过超过这个时间的量来限制你的流。显然,我指的不是标准 rx 'throttle',而是允许更新更新而不是旧更新的一种。这里有很多解决这类问题的方法。
我想你想使用类似 ObserveLatestOn
的东西。它有效地用单个值和一个标志替换了传入事件队列。
James World 已在此处发布相关博客 http://www.zerobugbuild.com/?p=192
这个概念在 GUI 应用程序中大量使用,它们无法相信服务器向其推送数据的速度。
您还可以在 Reactive Trader 中看到一个实现 https://github.com/AdaptiveConsulting/ReactiveTrader/blob/83a6b7f312b9ba9d70327f03d8d326934b379211/src/Adaptive.ReactiveTrader.Shared/Extensions/ObservableExtensions.cs#L64 以及解释 ReactiveTrader 的辅助演示 https://leecampbell.com/presentations/#ReactConfLondon2014
要明确这是一个 load-shedding 算法,而不是背压算法。
我前段时间也遇到了同样的问题,但我没有找到一个 built-in 运算符可以完全做到这一点。所以我写了我自己的,我称之为Latest
。实现起来并不简单,但发现它在我当前的项目中非常有用。
它是这样工作的:当观察者忙于处理之前的通知(当然是在它自己的线程上)时,它会将最后的最多 n 个通知(n >= 0)和 OnNext
s 排队观察者一旦空闲。所以:
Latest(0)
: 仅在观察者空闲时观察到达的物品Latest(1)
: 总是观察最新的Latest(1000)
(例如):通常处理所有项目,但如果有什么东西被卡在生产线上,宁愿错过一些也不要得到OutOfMemoryException
Latest(int.MaxValue)
: 不遗漏任何一项,但在生产者和消费者之间进行负载平衡。
因此您的代码将是:stream.Latest(1).Select(SlowFunction).Subscribe(printfn "%A")
签名看起来像这样:
/// <summary>
/// Avoids backpressure by enqueuing items when the <paramref name="source"/> produces them more rapidly than the observer can process.
/// </summary>
/// <param name="source">The source sequence.</param>
/// <param name="maxQueueSize">Maximum queue size. If the queue gets full, less recent items are discarded from the queue.</param>
/// <param name="scheduler">Optional, default: <see cref="Scheduler.Default"/>: <see cref="IScheduler"/> on which to observe notifications.</param>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="maxQueueSize"/> is negative.</exception>
/// <remarks>
/// A <paramref name="maxQueueSize"/> of 0 observes items only if the subscriber is ready.
/// A <paramref name="maxQueueSize"/> of 1 guarantees to observe the last item in the sequence, if any.
/// To observe the whole source sequence, specify <see cref="int.MaxValue"/>.
/// </remarks>
public static IObservable<TSource> Latest<TSource>(this IObservable<TSource> source, int maxQueueSize, IScheduler scheduler = null)
实施太大,无法在此处 post,但如果有人感兴趣,我很乐意分享。让我知道。
您可以 sample
以您知道 SlowFunction
可以处理的时间间隔播放流。这是 java 中的示例:
TestScheduler ts = new TestScheduler();
Observable<Long> stream = Observable.interval(1, TimeUnit.MILLISECONDS, ts).take(500);
stream.sample(100, TimeUnit.MILLISECONDS, ts).subscribe(System.out::println);
ts.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
98
198
298
398
498
499
sample
不会造成背压,总是抓取流中的最新值,满足你的要求。此外 sample
不会发送相同的值两次 (从上面可以看出 499
只打印一次).
我认为这是一个有效的 C#
/F#
解决方案:
static IDisposable PrintLatestData<T>(IObservable<T> stream) {
return stream.Sample(TimeSpan.FromMilliseconds(100))
.Select(SlowFunction)
.Subscribe(Console.WriteLine);
}
let PrintLatestData (stream: IObservable<_>) =
stream.Sample(TimeSpan.FromMilliseconds(100))
.Select(SlowFunction)
.Subscribe(printfn "%A")