条件延迟+节流算子
Conditional delay+throttle operator
我正在编写一个自定义 RX 运算符,它结合了 Throttle 和 Delay 的功能,具有以下签名
public static IObservable<T> DelayWhen(this IObservable<T> self, TimeSpan delay, Func<T, bool> condition);
规则如下:
- 如果
condition(t)
returnsfalse
,立即发射。
- 如果
condition(t)
returns true
,延迟delay
时间。
- 如果
self
在延迟期间发出值,则执行以下操作:
- If
condition(t)
returns false
, cancel/skip value scheduled for delayed emission and emit the new value
- If
condition(t)
returns true
then skip/ignore 这个新值(即,如果 self
在期间不再发出任何值,则延迟值将发出延迟)。
正如您从规则中可以看出的,这里有一些行为让人联想到节流。
我为解决此问题所做的各种尝试包括一些 async
刚刚变得复杂的方法。我真的觉得这应该可以使用现有的运营商来解决。例如。参见 ,它使用 Amb
非常巧妙,我觉得它非常接近我想要实现的目标。
这是 DelayWhen
运算符的一个实现,它基于内置的 Window
运算符:
更新:我原来的实现(Revision 1)不满足问题的要求,所以我把Delay
运算符换成了定制的 delaying/throttling 运算符。
/// <summary>
/// Either delays the emission of the elements that satisfy the condition, by the
/// specified time duration, or ignores them, in case they are produced before
/// the emission of previously delayed element. Elements that don't satisfy the
/// condition are emitted immediately, and they also cancel any pending emission of
/// all previously delayed elements.
/// </summary>
public static IObservable<T> DelayWhen<T>(this IObservable<T> source,
TimeSpan delay, Func<T, bool> condition, IScheduler scheduler = null)
{
// Arguments validation omitted
scheduler ??= DefaultScheduler.Instance;
return source
.Select(x => (Item: x, WithDelay: condition(x)))
.Publish(published => published.Window(published.Where(e => !e.WithDelay)))
.Select(w => Observable.Merge(
DelayThrottleSpecial(w.Where(e => e.WithDelay), delay, scheduler),
w.Where(e => !e.WithDelay)
))
.Switch()
.Select(e => e.Item);
/// <summary>
/// Time shifts the observable sequence by the specified time duration, ignoring
/// elements that are produced while a previous element is scheduled for emission.
/// </summary>
static IObservable<T2> DelayThrottleSpecial<T2>(IObservable<T2> source,
TimeSpan dueTime, IScheduler scheduler)
{
int mutex = 0; // 0: not acquired, 1: acquired
return source.SelectMany(x =>
{
if (Interlocked.CompareExchange(ref mutex, 1, 0) == 0)
return Observable.Return(x)
.DelaySubscription(dueTime, scheduler)
.Finally(() => Volatile.Write(ref mutex, 0));
return Observable.Empty<T2>();
});
}
}
源序列被划分为连续的 windows(子序列),每个 window 以一个 false
(非延迟)元素结束。然后根据要求将每个 window 投影到具有其 true
(延迟)元素 delayed/throttled 的新 window。最后,使用 Switch
运算符将投影的 windows 合并回单个序列,以便每次发出新的 window 时丢弃 window 的所有未决元素.
问题不是很清楚,所以使用以下测试用例作为场景:
Observable.Interval(TimeSpan.FromSeconds(1))
.Take(10)
.DelayWhen(TimeSpan.FromSeconds(1.5), i => i % 3 == 0 || i % 2 == 0)
结果如下:
// T: ---1---2---3---4---5---6---7---8---9---0---1----
// original: ---0---1---2---3---4---5---6---7---8---9
// delay?: ---Y---N---Y---Y---Y---N---Y---N---Y---Y
// expected: -------1---------2-----5-------7-------------8
//
// 0: Delayed, but interrupted by 1,
// 1: Non-delayed, emit immediately
// 2: Delayed, emit after 1.5 seconds
// 3: Delayed, since emitted during a delay, ignored
// 4: Delayed, but interrupted by 5.
// 5: Non-delayed, emit immediately
// 6: Delayed, but interrupted by 7.
// 7: Non-delayed, emit immediately
// 8: Delayed, but interrupted by 9
// 9: Delayed, since emitted during a delay, ignored
如果不符合要求,请说明问题。 @Theodore 的解决方案获得了正确的时机,但发出了 3 和 9,忽略了“cancel/skip 计划延迟发射的值并发出新值”子句。
这在功能上等同于 Theodore 的代码,但 (IMO) 更易于使用和理解:
public static IObservable<T> DelayWhen2<T>(this IObservable<T> source, TimeSpan delay, Func<T, bool> condition, IScheduler scheduler)
{
return source
.Select(x => (Item: x, WithDelay: condition(x)))
.Publish(published => published
.SelectMany(t => t.WithDelay
? Observable.Return(t)
.Delay(delay, scheduler)
.TakeUntil(published.Where(t2 => !t2.WithDelay))
: Observable.Return(t)
)
)
.Select(e => e.Item);
}
从那里,我不得不嵌入你是否延迟的状态 .Scan
:
public static IObservable<T> DelayWhen3<T>(this IObservable<T> source, TimeSpan delay, Func<T, bool> condition)
{
return DelayWhen3(source, delay, condition, Scheduler.Default);
}
public static IObservable<T> DelayWhen3<T>(this IObservable<T> source, TimeSpan delay, Func<T, bool> condition, IScheduler scheduler)
{
return source
.Select(x => (Item: x, WithDelay: condition(x)))
.Publish(published => published
.Timestamp(scheduler)
.Scan((delayOverTime: DateTimeOffset.MinValue, output: Observable.Empty<T>()), (state, t) => {
if(!t.Value.WithDelay)
//value isn't delayed, current delay status irrelevant, emit immediately, and cancel previous delay.
return (DateTimeOffset.MinValue, Observable.Return(t.Value.Item));
else
if (state.delayOverTime > t.Timestamp)
//value should be delayed, but current delay already in progress. Ignore value.
return (state.delayOverTime, Observable.Empty<T>());
else
//value should be delayed, no delay in progress. Set delay state, and return delayed observable.
return (t.Timestamp + delay, Observable.Return(t.Value.Item).Delay(delay, scheduler).TakeUntil(published.Where(t2 => !t2.WithDelay)));
})
)
.SelectMany(t => t.output);
}
在 .Scan
运算符中,您嵌入了前一个 Delay
到期的时间。这样你就知道可以处理一个应该在现有延迟内延迟的值。我向时间敏感函数添加了 scheduler
参数以启用测试:
var ts = new TestScheduler();
var target = Observable.Interval(TimeSpan.FromSeconds(1), ts)
.Take(10)
.DelayWhen3(TimeSpan.FromSeconds(1.5), i => i % 3 == 0 || i % 2 == 0, ts);
var observer = ts.CreateObserver<long>();
target.Subscribe(observer);
ts.Start();
var expected = new List<Recorded<Notification<long>>> {
new Recorded<Notification<long>>(2000.MsTicks(), Notification.CreateOnNext<long>(1)),
new Recorded<Notification<long>>(4500.MsTicks(), Notification.CreateOnNext<long>(2)),
new Recorded<Notification<long>>(6000.MsTicks(), Notification.CreateOnNext<long>(5)),
new Recorded<Notification<long>>(8000.MsTicks(), Notification.CreateOnNext<long>(7)),
new Recorded<Notification<long>>(10500.MsTicks(), Notification.CreateOnNext<long>(8)),
new Recorded<Notification<long>>(10500.MsTicks() + 1, Notification.CreateOnCompleted<long>()),
};
ReactiveAssert.AreElementsEqual(expected, observer.Messages);
MsTicks 代码:
public static long MsTicks(this int i)
{
return TimeSpan.FromMilliseconds(i).Ticks;
}
我正在编写一个自定义 RX 运算符,它结合了 Throttle 和 Delay 的功能,具有以下签名
public static IObservable<T> DelayWhen(this IObservable<T> self, TimeSpan delay, Func<T, bool> condition);
规则如下:
- 如果
condition(t)
returnsfalse
,立即发射。 - 如果
condition(t)
returnstrue
,延迟delay
时间。 - 如果
self
在延迟期间发出值,则执行以下操作:- If
condition(t)
returnsfalse
, cancel/skip value scheduled for delayed emission and emit the new value - If
condition(t)
returnstrue
then skip/ignore 这个新值(即,如果self
在期间不再发出任何值,则延迟值将发出延迟)。
- If
正如您从规则中可以看出的,这里有一些行为让人联想到节流。
我为解决此问题所做的各种尝试包括一些 async
刚刚变得复杂的方法。我真的觉得这应该可以使用现有的运营商来解决。例如。参见 ,它使用 Amb
非常巧妙,我觉得它非常接近我想要实现的目标。
这是 DelayWhen
运算符的一个实现,它基于内置的 Window
运算符:
更新:我原来的实现(Revision 1)不满足问题的要求,所以我把Delay
运算符换成了定制的 delaying/throttling 运算符。
/// <summary>
/// Either delays the emission of the elements that satisfy the condition, by the
/// specified time duration, or ignores them, in case they are produced before
/// the emission of previously delayed element. Elements that don't satisfy the
/// condition are emitted immediately, and they also cancel any pending emission of
/// all previously delayed elements.
/// </summary>
public static IObservable<T> DelayWhen<T>(this IObservable<T> source,
TimeSpan delay, Func<T, bool> condition, IScheduler scheduler = null)
{
// Arguments validation omitted
scheduler ??= DefaultScheduler.Instance;
return source
.Select(x => (Item: x, WithDelay: condition(x)))
.Publish(published => published.Window(published.Where(e => !e.WithDelay)))
.Select(w => Observable.Merge(
DelayThrottleSpecial(w.Where(e => e.WithDelay), delay, scheduler),
w.Where(e => !e.WithDelay)
))
.Switch()
.Select(e => e.Item);
/// <summary>
/// Time shifts the observable sequence by the specified time duration, ignoring
/// elements that are produced while a previous element is scheduled for emission.
/// </summary>
static IObservable<T2> DelayThrottleSpecial<T2>(IObservable<T2> source,
TimeSpan dueTime, IScheduler scheduler)
{
int mutex = 0; // 0: not acquired, 1: acquired
return source.SelectMany(x =>
{
if (Interlocked.CompareExchange(ref mutex, 1, 0) == 0)
return Observable.Return(x)
.DelaySubscription(dueTime, scheduler)
.Finally(() => Volatile.Write(ref mutex, 0));
return Observable.Empty<T2>();
});
}
}
源序列被划分为连续的 windows(子序列),每个 window 以一个 false
(非延迟)元素结束。然后根据要求将每个 window 投影到具有其 true
(延迟)元素 delayed/throttled 的新 window。最后,使用 Switch
运算符将投影的 windows 合并回单个序列,以便每次发出新的 window 时丢弃 window 的所有未决元素.
问题不是很清楚,所以使用以下测试用例作为场景:
Observable.Interval(TimeSpan.FromSeconds(1))
.Take(10)
.DelayWhen(TimeSpan.FromSeconds(1.5), i => i % 3 == 0 || i % 2 == 0)
结果如下:
// T: ---1---2---3---4---5---6---7---8---9---0---1----
// original: ---0---1---2---3---4---5---6---7---8---9
// delay?: ---Y---N---Y---Y---Y---N---Y---N---Y---Y
// expected: -------1---------2-----5-------7-------------8
//
// 0: Delayed, but interrupted by 1,
// 1: Non-delayed, emit immediately
// 2: Delayed, emit after 1.5 seconds
// 3: Delayed, since emitted during a delay, ignored
// 4: Delayed, but interrupted by 5.
// 5: Non-delayed, emit immediately
// 6: Delayed, but interrupted by 7.
// 7: Non-delayed, emit immediately
// 8: Delayed, but interrupted by 9
// 9: Delayed, since emitted during a delay, ignored
如果不符合要求,请说明问题。 @Theodore 的解决方案获得了正确的时机,但发出了 3 和 9,忽略了“cancel/skip 计划延迟发射的值并发出新值”子句。
这在功能上等同于 Theodore 的代码,但 (IMO) 更易于使用和理解:
public static IObservable<T> DelayWhen2<T>(this IObservable<T> source, TimeSpan delay, Func<T, bool> condition, IScheduler scheduler)
{
return source
.Select(x => (Item: x, WithDelay: condition(x)))
.Publish(published => published
.SelectMany(t => t.WithDelay
? Observable.Return(t)
.Delay(delay, scheduler)
.TakeUntil(published.Where(t2 => !t2.WithDelay))
: Observable.Return(t)
)
)
.Select(e => e.Item);
}
从那里,我不得不嵌入你是否延迟的状态 .Scan
:
public static IObservable<T> DelayWhen3<T>(this IObservable<T> source, TimeSpan delay, Func<T, bool> condition)
{
return DelayWhen3(source, delay, condition, Scheduler.Default);
}
public static IObservable<T> DelayWhen3<T>(this IObservable<T> source, TimeSpan delay, Func<T, bool> condition, IScheduler scheduler)
{
return source
.Select(x => (Item: x, WithDelay: condition(x)))
.Publish(published => published
.Timestamp(scheduler)
.Scan((delayOverTime: DateTimeOffset.MinValue, output: Observable.Empty<T>()), (state, t) => {
if(!t.Value.WithDelay)
//value isn't delayed, current delay status irrelevant, emit immediately, and cancel previous delay.
return (DateTimeOffset.MinValue, Observable.Return(t.Value.Item));
else
if (state.delayOverTime > t.Timestamp)
//value should be delayed, but current delay already in progress. Ignore value.
return (state.delayOverTime, Observable.Empty<T>());
else
//value should be delayed, no delay in progress. Set delay state, and return delayed observable.
return (t.Timestamp + delay, Observable.Return(t.Value.Item).Delay(delay, scheduler).TakeUntil(published.Where(t2 => !t2.WithDelay)));
})
)
.SelectMany(t => t.output);
}
在 .Scan
运算符中,您嵌入了前一个 Delay
到期的时间。这样你就知道可以处理一个应该在现有延迟内延迟的值。我向时间敏感函数添加了 scheduler
参数以启用测试:
var ts = new TestScheduler();
var target = Observable.Interval(TimeSpan.FromSeconds(1), ts)
.Take(10)
.DelayWhen3(TimeSpan.FromSeconds(1.5), i => i % 3 == 0 || i % 2 == 0, ts);
var observer = ts.CreateObserver<long>();
target.Subscribe(observer);
ts.Start();
var expected = new List<Recorded<Notification<long>>> {
new Recorded<Notification<long>>(2000.MsTicks(), Notification.CreateOnNext<long>(1)),
new Recorded<Notification<long>>(4500.MsTicks(), Notification.CreateOnNext<long>(2)),
new Recorded<Notification<long>>(6000.MsTicks(), Notification.CreateOnNext<long>(5)),
new Recorded<Notification<long>>(8000.MsTicks(), Notification.CreateOnNext<long>(7)),
new Recorded<Notification<long>>(10500.MsTicks(), Notification.CreateOnNext<long>(8)),
new Recorded<Notification<long>>(10500.MsTicks() + 1, Notification.CreateOnCompleted<long>()),
};
ReactiveAssert.AreElementsEqual(expected, observer.Messages);
MsTicks 代码:
public static long MsTicks(this int i)
{
return TimeSpan.FromMilliseconds(i).Ticks;
}