自定义 Rx 运算符,仅在有最近值时才进行节流
Custom Rx operator for throttling only when there's a been a recent value
我正在尝试创建一个看起来非常有用的 Rx 运算符,但令人惊讶的是,我在 Whosebug 上没有发现任何精确匹配的问题。我想在 Throttle
上创建一个变体,如果有一段时间不活动,它可以让值立即通过。我想象的用例是这样的:
我有一个下拉菜单,它会在值更改时启动 Web 请求。如果用户按住箭头键并快速循环浏览这些值,我不想启动对每个值的请求。但是,如果我对流进行节流,则用户每次以正常方式 select 从下拉列表中选择一个值时,都必须等待节流持续时间结束。
因此,正常的 Throttle
看起来像这样:
我想创建如下所示的 ThrottleSubsequent
:
请注意,弹珠 1、2 和 6 会毫不延迟地通过,因为它们每个都经过一段时间的不活动。
我的尝试如下所示:
public static IObservable<TSource> ThrottleSubsequent<TSource>(this IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
{
// Create a timer that resets with each new source value
var cooldownTimer = source
.Select(x => Observable.Interval(dueTime, scheduler)) // Each source value becomes a new timer
.Switch(); // Switch to the most recent timer
var cooldownWindow = source.Window(() => cooldownTimer);
// Pass along the first value of each cooldown window immediately
var firstAfterCooldown = cooldownWindow.SelectMany(o => o.Take(1));
// Throttle the rest of the values
var throttledRest = cooldownWindow
.SelectMany(o => o.Skip(1))
.Throttle(dueTime, scheduler);
return Observable.Merge(firstAfterCooldown, throttledRest);
}
这 似乎 可以工作,但我很难对此进行推理,我感觉这里有一些边缘情况,重复的事情可能会变得棘手价值观什么的。我想从更有经验的 Rx-ers 那里得到一些关于这段代码是否正确的反馈,and/or是否有更惯用的方法来做到这一点。
嗯,这是一个测试套件(使用 nuget Microsoft.Reactive.Testing
):
var ts = new TestScheduler();
var source = ts.CreateHotObservable<char>(
new Recorded<Notification<char>>(200.MsTicks(), Notification.CreateOnNext('A')),
new Recorded<Notification<char>>(300.MsTicks(), Notification.CreateOnNext('B')),
new Recorded<Notification<char>>(500.MsTicks(), Notification.CreateOnNext('C')),
new Recorded<Notification<char>>(510.MsTicks(), Notification.CreateOnNext('D')),
new Recorded<Notification<char>>(550.MsTicks(), Notification.CreateOnNext('E')),
new Recorded<Notification<char>>(610.MsTicks(), Notification.CreateOnNext('F')),
new Recorded<Notification<char>>(760.MsTicks(), Notification.CreateOnNext('G'))
);
var target = source.ThrottleSubsequent(TimeSpan.FromMilliseconds(150), ts);
var expectedResults = ts.CreateHotObservable<char>(
new Recorded<Notification<char>>(200.MsTicks(), Notification.CreateOnNext('A')),
new Recorded<Notification<char>>(450.MsTicks(), Notification.CreateOnNext('B')),
new Recorded<Notification<char>>(500.MsTicks(), Notification.CreateOnNext('C')),
new Recorded<Notification<char>>(910.MsTicks(), Notification.CreateOnNext('G'))
);
var observer = ts.CreateObserver<char>();
target.Subscribe(observer);
ts.Start();
ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);
并使用
public static class TestingHelpers
{
public static long MsTicks(this int i)
{
return TimeSpan.FromMilliseconds(i).Ticks;
}
}
好像过了。如果你想减少它,你可以把它变成这样:
public static IObservable<TSource> ThrottleSubsequent2<TSource>(this IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
{
return source.Publish(_source => _source
.Window(() => _source
.Select(x => Observable.Interval(dueTime, scheduler))
.Switch()
))
.Publish(cooldownWindow =>
Observable.Merge(
cooldownWindow
.SelectMany(o => o.Take(1)),
cooldownWindow
.SelectMany(o => o.Skip(1))
.Throttle(dueTime, scheduler)
)
);
}
编辑:
Publish
强制共享订阅。如果你有一个坏的(或昂贵的)源可观察到订阅副作用,Publish
确保你只订阅一次。这是 Publish
有帮助的示例:
void Main()
{
var source = UglyRange(10);
var target = source
.SelectMany(i => Observable.Return(i).Delay(TimeSpan.FromMilliseconds(10 * i)))
.ThrottleSubsequent2(TimeSpan.FromMilliseconds(70), Scheduler.Default) //Works with ThrottleSubsequent2, fails with ThrottleSubsequent
.Subscribe(i => Console.WriteLine(i));
}
static int counter = 0;
public IObservable<int> UglyRange(int limit)
{
var uglySource = Observable.Create<int>(o =>
{
if (counter++ == 0)
{
Console.WriteLine("Ugly observable should only be created once.");
Enumerable.Range(1, limit).ToList().ForEach(i => o.OnNext(i));
}
else
{
Console.WriteLine($"Ugly observable should only be created once. This is the {counter}th time created.");
o.OnError(new Exception($"observable invoked {counter} times."));
}
return Disposable.Empty;
});
return uglySource;
}
我正在尝试创建一个看起来非常有用的 Rx 运算符,但令人惊讶的是,我在 Whosebug 上没有发现任何精确匹配的问题。我想在 Throttle
上创建一个变体,如果有一段时间不活动,它可以让值立即通过。我想象的用例是这样的:
我有一个下拉菜单,它会在值更改时启动 Web 请求。如果用户按住箭头键并快速循环浏览这些值,我不想启动对每个值的请求。但是,如果我对流进行节流,则用户每次以正常方式 select 从下拉列表中选择一个值时,都必须等待节流持续时间结束。
因此,正常的 Throttle
看起来像这样:
我想创建如下所示的 ThrottleSubsequent
:
请注意,弹珠 1、2 和 6 会毫不延迟地通过,因为它们每个都经过一段时间的不活动。
我的尝试如下所示:
public static IObservable<TSource> ThrottleSubsequent<TSource>(this IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
{
// Create a timer that resets with each new source value
var cooldownTimer = source
.Select(x => Observable.Interval(dueTime, scheduler)) // Each source value becomes a new timer
.Switch(); // Switch to the most recent timer
var cooldownWindow = source.Window(() => cooldownTimer);
// Pass along the first value of each cooldown window immediately
var firstAfterCooldown = cooldownWindow.SelectMany(o => o.Take(1));
// Throttle the rest of the values
var throttledRest = cooldownWindow
.SelectMany(o => o.Skip(1))
.Throttle(dueTime, scheduler);
return Observable.Merge(firstAfterCooldown, throttledRest);
}
这 似乎 可以工作,但我很难对此进行推理,我感觉这里有一些边缘情况,重复的事情可能会变得棘手价值观什么的。我想从更有经验的 Rx-ers 那里得到一些关于这段代码是否正确的反馈,and/or是否有更惯用的方法来做到这一点。
嗯,这是一个测试套件(使用 nuget Microsoft.Reactive.Testing
):
var ts = new TestScheduler();
var source = ts.CreateHotObservable<char>(
new Recorded<Notification<char>>(200.MsTicks(), Notification.CreateOnNext('A')),
new Recorded<Notification<char>>(300.MsTicks(), Notification.CreateOnNext('B')),
new Recorded<Notification<char>>(500.MsTicks(), Notification.CreateOnNext('C')),
new Recorded<Notification<char>>(510.MsTicks(), Notification.CreateOnNext('D')),
new Recorded<Notification<char>>(550.MsTicks(), Notification.CreateOnNext('E')),
new Recorded<Notification<char>>(610.MsTicks(), Notification.CreateOnNext('F')),
new Recorded<Notification<char>>(760.MsTicks(), Notification.CreateOnNext('G'))
);
var target = source.ThrottleSubsequent(TimeSpan.FromMilliseconds(150), ts);
var expectedResults = ts.CreateHotObservable<char>(
new Recorded<Notification<char>>(200.MsTicks(), Notification.CreateOnNext('A')),
new Recorded<Notification<char>>(450.MsTicks(), Notification.CreateOnNext('B')),
new Recorded<Notification<char>>(500.MsTicks(), Notification.CreateOnNext('C')),
new Recorded<Notification<char>>(910.MsTicks(), Notification.CreateOnNext('G'))
);
var observer = ts.CreateObserver<char>();
target.Subscribe(observer);
ts.Start();
ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);
并使用
public static class TestingHelpers
{
public static long MsTicks(this int i)
{
return TimeSpan.FromMilliseconds(i).Ticks;
}
}
好像过了。如果你想减少它,你可以把它变成这样:
public static IObservable<TSource> ThrottleSubsequent2<TSource>(this IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
{
return source.Publish(_source => _source
.Window(() => _source
.Select(x => Observable.Interval(dueTime, scheduler))
.Switch()
))
.Publish(cooldownWindow =>
Observable.Merge(
cooldownWindow
.SelectMany(o => o.Take(1)),
cooldownWindow
.SelectMany(o => o.Skip(1))
.Throttle(dueTime, scheduler)
)
);
}
编辑:
Publish
强制共享订阅。如果你有一个坏的(或昂贵的)源可观察到订阅副作用,Publish
确保你只订阅一次。这是 Publish
有帮助的示例:
void Main()
{
var source = UglyRange(10);
var target = source
.SelectMany(i => Observable.Return(i).Delay(TimeSpan.FromMilliseconds(10 * i)))
.ThrottleSubsequent2(TimeSpan.FromMilliseconds(70), Scheduler.Default) //Works with ThrottleSubsequent2, fails with ThrottleSubsequent
.Subscribe(i => Console.WriteLine(i));
}
static int counter = 0;
public IObservable<int> UglyRange(int limit)
{
var uglySource = Observable.Create<int>(o =>
{
if (counter++ == 0)
{
Console.WriteLine("Ugly observable should only be created once.");
Enumerable.Range(1, limit).ToList().ForEach(i => o.OnNext(i));
}
else
{
Console.WriteLine($"Ugly observable should only be created once. This is the {counter}th time created.");
o.OnError(new Exception($"observable invoked {counter} times."));
}
return Disposable.Empty;
});
return uglySource;
}