IObservable - 在一段时间内忽略新元素
IObservable - Ignore new elements for a span of time
我正在尝试 "throttle" IObservable(我认为是)标准节流方法的不同方式。
我想忽略流中第一个非忽略值之后的 1s 值。
例如,如果 1s=5 个破折号
source: --1-23--45-----678901234
result: --1-----4------6----1---
关于如何实现这一点有什么想法吗?
这应该可以解决问题。可能会有更短的实现。
Scan
中的accumulate存储最后保存的Timestamp
Item
并标记是否Keep
每一项
public static IObservable<T> RateLimit<T>(this IObservable<T> source, TimeSpan duration)
{
return observable
.Timestamp()
.Scan(
new
{
Item = default(T),
Timestamp = DateTimeOffset.MinValue,
Keep = false
},
(a, x) =>
{
var keep = a.Timestamp + duration <= x.Timestamp;
return new
{
Item = x.Value,
Timestamp = keep ? x.Timestamp : a.Timestamp,
Keep = keep
};
}
})
.Where(a => a.Keep)
.Select(a => a.Item);
}
这是在 Rx 中执行此操作的惯用方法,作为扩展方法 - 下面是使用您的场景的解释和示例。
所需函数的工作方式与 Observable.Throttle
非常相似,但符合条件的事件一到达就发出,而不是延迟到节流或采样周期的持续时间。在排位赛后的给定持续时间内,后续事件将被抑制:
public static IObservable<T> SampleFirst<T>(
this IObservable<T> source,
TimeSpan sampleDuration,
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
return source.Publish(ps =>
ps.Window(() => ps.Delay(sampleDuration,scheduler))
.SelectMany(x => x.Take(1)));
}
想法是使用 Window 的重载创建非重叠 windows 使用 windowClosingSelector 使用由 sampleDuration 时移回来的源。因此,每个 window 将:(a) 由其中的第一个元素关闭,并且 (b) 保持打开状态,直到允许使用新元素。然后我们简单地 select 来自每个 window.
的第一个元素
在下面的示例中,我完全重复了您的测试场景,将一个 "dash" 建模为 100 个刻度。请注意,延迟被指定为 499 个滴答而不是 500 个滴答,这是因为在多个调度程序之间传递事件的分辨率导致 1 个滴答漂移 - 实际上,您不需要详述这一点,因为单个滴答分辨率不太可能有意义。 ReactiveTest
class 和 OnNext
辅助方法通过包含 Rx 测试框架 nuget 包 rx-testing
:
提供
public class Tests : ReactiveTest
{
public void Scenario()
{
var scheduler = new TestScheduler();
var test = scheduler.CreateHotObservable<int>(
// set up events as per the OP scenario
// using 1 dash = 100 ticks
OnNext(200, 1),
OnNext(400, 2),
OnNext(500, 3),
OnNext(800, 4),
OnNext(900, 5),
OnNext(1500, 6),
OnNext(1600, 7),
OnNext(1700, 8),
OnNext(1800, 9),
OnNext(1900, 0),
OnNext(2000, 1),
OnNext(2100, 2),
OnNext(2200, 3),
OnNext(2300, 4)
);
test.SampleFirst(TimeSpan.FromTicks(499), scheduler)
.Timestamp(scheduler)
.Subscribe(x => Console.WriteLine(
"Time: {0} Value: {1}", x.Timestamp.Ticks, x.Value));
scheduler.Start();
}
}
请注意,输出是根据您的情况得出的:
Time: 200 Value: 1
Time: 800 Value: 4
Time: 1500 Value: 6
Time: 2000 Value: 1
我正在尝试 "throttle" IObservable(我认为是)标准节流方法的不同方式。
我想忽略流中第一个非忽略值之后的 1s 值。
例如,如果 1s=5 个破折号
source: --1-23--45-----678901234
result: --1-----4------6----1---
关于如何实现这一点有什么想法吗?
这应该可以解决问题。可能会有更短的实现。
Scan
中的accumulate存储最后保存的Timestamp
Item
并标记是否Keep
每一项
public static IObservable<T> RateLimit<T>(this IObservable<T> source, TimeSpan duration)
{
return observable
.Timestamp()
.Scan(
new
{
Item = default(T),
Timestamp = DateTimeOffset.MinValue,
Keep = false
},
(a, x) =>
{
var keep = a.Timestamp + duration <= x.Timestamp;
return new
{
Item = x.Value,
Timestamp = keep ? x.Timestamp : a.Timestamp,
Keep = keep
};
}
})
.Where(a => a.Keep)
.Select(a => a.Item);
}
这是在 Rx 中执行此操作的惯用方法,作为扩展方法 - 下面是使用您的场景的解释和示例。
所需函数的工作方式与 Observable.Throttle
非常相似,但符合条件的事件一到达就发出,而不是延迟到节流或采样周期的持续时间。在排位赛后的给定持续时间内,后续事件将被抑制:
public static IObservable<T> SampleFirst<T>(
this IObservable<T> source,
TimeSpan sampleDuration,
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
return source.Publish(ps =>
ps.Window(() => ps.Delay(sampleDuration,scheduler))
.SelectMany(x => x.Take(1)));
}
想法是使用 Window 的重载创建非重叠 windows 使用 windowClosingSelector 使用由 sampleDuration 时移回来的源。因此,每个 window 将:(a) 由其中的第一个元素关闭,并且 (b) 保持打开状态,直到允许使用新元素。然后我们简单地 select 来自每个 window.
的第一个元素在下面的示例中,我完全重复了您的测试场景,将一个 "dash" 建模为 100 个刻度。请注意,延迟被指定为 499 个滴答而不是 500 个滴答,这是因为在多个调度程序之间传递事件的分辨率导致 1 个滴答漂移 - 实际上,您不需要详述这一点,因为单个滴答分辨率不太可能有意义。 ReactiveTest
class 和 OnNext
辅助方法通过包含 Rx 测试框架 nuget 包 rx-testing
:
public class Tests : ReactiveTest
{
public void Scenario()
{
var scheduler = new TestScheduler();
var test = scheduler.CreateHotObservable<int>(
// set up events as per the OP scenario
// using 1 dash = 100 ticks
OnNext(200, 1),
OnNext(400, 2),
OnNext(500, 3),
OnNext(800, 4),
OnNext(900, 5),
OnNext(1500, 6),
OnNext(1600, 7),
OnNext(1700, 8),
OnNext(1800, 9),
OnNext(1900, 0),
OnNext(2000, 1),
OnNext(2100, 2),
OnNext(2200, 3),
OnNext(2300, 4)
);
test.SampleFirst(TimeSpan.FromTicks(499), scheduler)
.Timestamp(scheduler)
.Subscribe(x => Console.WriteLine(
"Time: {0} Value: {1}", x.Timestamp.Ticks, x.Value));
scheduler.Start();
}
}
请注意,输出是根据您的情况得出的:
Time: 200 Value: 1
Time: 800 Value: 4
Time: 1500 Value: 6
Time: 2000 Value: 1