当调用长于间隔长度时,反应性扩展固定异步调用之间的间隔
Reactive extension fixed Interval between async calls when call is longer than Interval length
这是我的 Interval
定义:
m_interval = Observable.Interval(TimeSpan.FromSeconds(5), m_schedulerProvider.EventLoop)
.ObserveOn(m_schedulerProvider.EventLoop)
.Select(l => Observable.FromAsync(DoWork))
.Concat()
.Subscribe();
在上面的代码中,我将 Interval
和 ObserveOn
中的 IScheduler
来自 SchedulerProvider
以便我可以更快地进行单元测试(TestScheduler.AdvanceBy).此外,DoWork
是一种 async
方法。
在我的特殊情况下,我希望每 5 秒调用一次 DoWork
函数。这里的问题是我希望 5 秒是 DoWork
结束和另一个开始之间的时间。因此,如果 DoWork
执行时间超过 5 秒,比如说 10 秒,则第一次调用将在 5 秒后进行,第二次调用将在 15 秒后进行。
不幸的是,以下测试证明它的行为并非如此:
[Fact]
public void MultiPluginStatusHelperShouldWaitForNextQuery()
{
m_queryHelperMock
.Setup(x => x.CustomQueryAsync())
.Callback(() => Thread.Sleep(10000))
.Returns(Task.FromResult(new QueryCompletedEventData()))
.Verifiable()
;
var multiPluginStatusHelper = m_container.GetInstance<IMultiPluginStatusHelper>();
multiPluginStatusHelper.MillisecondsInterval = 5000;
m_testSchedulerProvider.EventLoopScheduler.AdvanceBy(TimeSpan.FromMilliseconds(5000).Ticks);
m_testSchedulerProvider.EventLoopScheduler.AdvanceBy(TimeSpan.FromMilliseconds(5000).Ticks);
m_queryHelperMock.Verify(x => x.CustomQueryAsync(), Times.Once);
}
DoWork
调用了 CustomQueryAsync
并且测试失败说它被调用了两次。由于 .Callback(() => Thread.Sleep(1000))
.
强制延迟,它应该只被调用一次
我做错了什么?
我的实际实现来自this example。
您的问题是您的代码混合了惰性 (Observable
) 和非惰性 (Task) 结构。当您的第一个 Task
正在执行时,Interval
将再次触发并在 Select
运算符中创建一个新任务。如果你想避免这种行为,你需要将你的 Observable 包装到一个 Defer
块中:
m_interval = Observable.Interval(TimeSpan.FromSeconds(5), m_schedulerProvider.EventLoop)
.ObserveOn(m_schedulerProvider.EventLoop)
//I think `Defer` implicitly wraps Tasks, if not wrap it in `FromAsync` Again
.Select(l => Observable.Defer(() => DoWork()))
.Concat()
.Subscribe();
这样做的结果是每个 Observable
只会在订阅时执行延迟的 Task
,即当前一个完成时。
值得注意的是,如果您的生产者的生产速度比您消耗的速度快得多,这确实会产生问题,它会开始堆积,并且您的记忆也会堆积起来。作为替代方案,我建议使用 :
public static IObservable<TOut> GenerateAsync<TResult, TOut>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TimeSpan> timeSelector,
Func<TResult, TOut> resultSelector,
IScheduler scheduler = null)
{
var s = scheduler ?? Scheduler.Default;
return Observable.Create<TOut>(async obs => {
//You have to do your initial time delay here.
var init = await initialState();
return s.Schedule(init, timeSelector(init), async (state, recurse) =>
{
//Check if we are done
if (!condition(state))
{
obs.OnCompleted();
return;
}
//Process the result
obs.OnNext(resultSelector(state));
//Initiate the next request
state = await iterate(state);
//Recursively schedule again
recurse(state, timeSelector(state));
});
});
}
GenerateAsync(DoWork /*Initial state*/,
_ => true /*Forever*/,
_ => DoWork() /*Do your async task*/,
_ => TimeSpan.FromSeconds(5) /*Delay between events*/,
_ => _ /*Any transformations*/,
scheduler)
.Subscribe();
以上消除了 producer/consumer 比赛的问题,方法是在第一个比赛结束后才安排下一个比赛。
这个问题经常出现,通常是在轮询一些不可观察的数据源时。当我遇到它时,我使用了我前一段时间写的 RepeatAfterDelay
运算符:
public static IObservable<T> RepeatAfterDelay<T>(this IObservable<T> source, TimeSpan delay, IScheduler scheduler)
{
var repeatSignal = Observable
.Empty<T>()
.Delay(delay, scheduler);
// when source finishes, wait for the specified
// delay, then repeat.
return source.Concat(repeatSignal).Repeat();
}
这就是我的使用方式:
// do first set of work immediately, and then every 5 seconds do it again
m_interval = Observable
.FromAsync(DoWork)
.RepeatAfterDelay(TimeSpan.FromSeconds(5), scheduler)
.Subscribe();
// wait 5 seconds, then do first set of work, then again every 5 seconds
m_interval = Observable
.Timer(TimeSpan.FromSeconds(5), scheduler)
.SelectMany(_ => Observable
.FromAsync(DoWork)
.RepeatAfterDelay(TimeSpan.FromSeconds(5), scheduler))
.Subscribe();
虽然 @Brandon 的解决方案很好很干净,但我发现它阻塞了一个线程来等待延迟计时器。非阻塞替代方案可能类似于:
public static IObservable<T> DelayRepeat<T>(this IObservable<T> source, TimeSpan delay) =>
source
.Concat(
Observable.Create<T>(async observer =>
{
await Task.Delay(delay);
observer.OnCompleted();
}))
.Repeat();
这是我的 Interval
定义:
m_interval = Observable.Interval(TimeSpan.FromSeconds(5), m_schedulerProvider.EventLoop)
.ObserveOn(m_schedulerProvider.EventLoop)
.Select(l => Observable.FromAsync(DoWork))
.Concat()
.Subscribe();
在上面的代码中,我将 Interval
和 ObserveOn
中的 IScheduler
来自 SchedulerProvider
以便我可以更快地进行单元测试(TestScheduler.AdvanceBy).此外,DoWork
是一种 async
方法。
在我的特殊情况下,我希望每 5 秒调用一次 DoWork
函数。这里的问题是我希望 5 秒是 DoWork
结束和另一个开始之间的时间。因此,如果 DoWork
执行时间超过 5 秒,比如说 10 秒,则第一次调用将在 5 秒后进行,第二次调用将在 15 秒后进行。
不幸的是,以下测试证明它的行为并非如此:
[Fact]
public void MultiPluginStatusHelperShouldWaitForNextQuery()
{
m_queryHelperMock
.Setup(x => x.CustomQueryAsync())
.Callback(() => Thread.Sleep(10000))
.Returns(Task.FromResult(new QueryCompletedEventData()))
.Verifiable()
;
var multiPluginStatusHelper = m_container.GetInstance<IMultiPluginStatusHelper>();
multiPluginStatusHelper.MillisecondsInterval = 5000;
m_testSchedulerProvider.EventLoopScheduler.AdvanceBy(TimeSpan.FromMilliseconds(5000).Ticks);
m_testSchedulerProvider.EventLoopScheduler.AdvanceBy(TimeSpan.FromMilliseconds(5000).Ticks);
m_queryHelperMock.Verify(x => x.CustomQueryAsync(), Times.Once);
}
DoWork
调用了 CustomQueryAsync
并且测试失败说它被调用了两次。由于 .Callback(() => Thread.Sleep(1000))
.
我做错了什么?
我的实际实现来自this example。
您的问题是您的代码混合了惰性 (Observable
) 和非惰性 (Task) 结构。当您的第一个 Task
正在执行时,Interval
将再次触发并在 Select
运算符中创建一个新任务。如果你想避免这种行为,你需要将你的 Observable 包装到一个 Defer
块中:
m_interval = Observable.Interval(TimeSpan.FromSeconds(5), m_schedulerProvider.EventLoop)
.ObserveOn(m_schedulerProvider.EventLoop)
//I think `Defer` implicitly wraps Tasks, if not wrap it in `FromAsync` Again
.Select(l => Observable.Defer(() => DoWork()))
.Concat()
.Subscribe();
这样做的结果是每个 Observable
只会在订阅时执行延迟的 Task
,即当前一个完成时。
值得注意的是,如果您的生产者的生产速度比您消耗的速度快得多,这确实会产生问题,它会开始堆积,并且您的记忆也会堆积起来。作为替代方案,我建议使用
public static IObservable<TOut> GenerateAsync<TResult, TOut>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TimeSpan> timeSelector,
Func<TResult, TOut> resultSelector,
IScheduler scheduler = null)
{
var s = scheduler ?? Scheduler.Default;
return Observable.Create<TOut>(async obs => {
//You have to do your initial time delay here.
var init = await initialState();
return s.Schedule(init, timeSelector(init), async (state, recurse) =>
{
//Check if we are done
if (!condition(state))
{
obs.OnCompleted();
return;
}
//Process the result
obs.OnNext(resultSelector(state));
//Initiate the next request
state = await iterate(state);
//Recursively schedule again
recurse(state, timeSelector(state));
});
});
}
GenerateAsync(DoWork /*Initial state*/,
_ => true /*Forever*/,
_ => DoWork() /*Do your async task*/,
_ => TimeSpan.FromSeconds(5) /*Delay between events*/,
_ => _ /*Any transformations*/,
scheduler)
.Subscribe();
以上消除了 producer/consumer 比赛的问题,方法是在第一个比赛结束后才安排下一个比赛。
这个问题经常出现,通常是在轮询一些不可观察的数据源时。当我遇到它时,我使用了我前一段时间写的 RepeatAfterDelay
运算符:
public static IObservable<T> RepeatAfterDelay<T>(this IObservable<T> source, TimeSpan delay, IScheduler scheduler)
{
var repeatSignal = Observable
.Empty<T>()
.Delay(delay, scheduler);
// when source finishes, wait for the specified
// delay, then repeat.
return source.Concat(repeatSignal).Repeat();
}
这就是我的使用方式:
// do first set of work immediately, and then every 5 seconds do it again
m_interval = Observable
.FromAsync(DoWork)
.RepeatAfterDelay(TimeSpan.FromSeconds(5), scheduler)
.Subscribe();
// wait 5 seconds, then do first set of work, then again every 5 seconds
m_interval = Observable
.Timer(TimeSpan.FromSeconds(5), scheduler)
.SelectMany(_ => Observable
.FromAsync(DoWork)
.RepeatAfterDelay(TimeSpan.FromSeconds(5), scheduler))
.Subscribe();
虽然 @Brandon 的解决方案很好很干净,但我发现它阻塞了一个线程来等待延迟计时器。非阻塞替代方案可能类似于:
public static IObservable<T> DelayRepeat<T>(this IObservable<T> source, TimeSpan delay) =>
source
.Concat(
Observable.Create<T>(async observer =>
{
await Task.Delay(delay);
observer.OnCompleted();
}))
.Repeat();