当调用长于间隔长度时,反应性扩展固定异步调用之间的间隔

Reactive extension fixed Interval between async calls when call is longer than Interval length

这是我的 Interval 定义:

m_interval = Observable.Interval(TimeSpan.FromSeconds(5), m_schedulerProvider.EventLoop)
                .ObserveOn(m_schedulerProvider.EventLoop)
                .Select(l => Observable.FromAsync(DoWork))
                .Concat()
                .Subscribe();

在上面的代码中,我将 IntervalObserveOn 中的 IScheduler 来自 SchedulerProvider 以便我可以更快地进行单元测试(TestScheduler.AdvanceBy).此外,DoWork 是一种 async 方法。

在我的特殊情况下,我希望每 5 秒调用一次 DoWork 函数。这里的问题是我希望 5 秒是 DoWork 结束和另一个开始之间的时间。因此,如果 DoWork 执行时间超过 5 秒,比如说 10 秒,则第一次调用将在 5 秒后进行,第二次调用将在 15 秒后进行。

不幸的是,以下测试证明它的行为并非如此:

[Fact]
public void MultiPluginStatusHelperShouldWaitForNextQuery()
{    
    m_queryHelperMock
        .Setup(x => x.CustomQueryAsync())
        .Callback(() => Thread.Sleep(10000))
        .Returns(Task.FromResult(new QueryCompletedEventData()))
        .Verifiable()
    ;

    var multiPluginStatusHelper = m_container.GetInstance<IMultiPluginStatusHelper>();
    multiPluginStatusHelper.MillisecondsInterval = 5000;
    m_testSchedulerProvider.EventLoopScheduler.AdvanceBy(TimeSpan.FromMilliseconds(5000).Ticks);
    m_testSchedulerProvider.EventLoopScheduler.AdvanceBy(TimeSpan.FromMilliseconds(5000).Ticks);

    m_queryHelperMock.Verify(x => x.CustomQueryAsync(), Times.Once);
}

DoWork 调用了 CustomQueryAsync 并且测试失败说它被调用了两次。由于 .Callback(() => Thread.Sleep(1000)).

强制延迟,它应该只被调用一次

我做错了什么?

我的实际实现来自this example

您的问题是您的代码混合了惰性 (Observable) 和非惰性 (Task) 结构。当您的第一个 Task 正在执行时,Interval 将再次触发并在 Select 运算符中创建一个新任务。如果你想避免这种行为,你需要将你的 Observable 包装到一个 Defer 块中:

m_interval = Observable.Interval(TimeSpan.FromSeconds(5), m_schedulerProvider.EventLoop)
                .ObserveOn(m_schedulerProvider.EventLoop)
                 //I think `Defer` implicitly wraps Tasks, if not wrap it in `FromAsync` Again
                .Select(l => Observable.Defer(() => DoWork()))
                .Concat()
                .Subscribe();

这样做的结果是每个 Observable 只会在订阅时执行延迟的 Task,即当前一个完成时。

值得注意的是,如果您的生产者的生产速度比您消耗的速度快得多,这确实会产生问题,它会开始堆积,并且您的记忆也会堆积起来。作为替代方案,我建议使用 :

    public static IObservable<TOut> GenerateAsync<TResult, TOut>(
    Func<Task<TResult>> initialState,
    Func<TResult, bool> condition,
    Func<TResult, Task<TResult>> iterate,
    Func<TResult, TimeSpan> timeSelector,
    Func<TResult, TOut> resultSelector,
    IScheduler scheduler = null) 
{
  var s = scheduler ?? Scheduler.Default;

  return Observable.Create<TOut>(async obs => {

    //You have to do your initial time delay here.
    var init = await initialState();
    return s.Schedule(init, timeSelector(init), async (state, recurse) => 
    {
      //Check if we are done
      if (!condition(state))
      {
        obs.OnCompleted();
        return;
      }

      //Process the result
      obs.OnNext(resultSelector(state));

      //Initiate the next request
      state = await iterate(state);

      //Recursively schedule again
      recurse(state, timeSelector(state));

    });
  });
}

GenerateAsync(DoWork /*Initial state*/, 
              _ => true /*Forever*/, 
              _ => DoWork() /*Do your async task*/,
              _ => TimeSpan.FromSeconds(5) /*Delay between events*/, 
              _ => _ /*Any transformations*/, 
              scheduler)
.Subscribe();

以上消除了 producer/consumer 比赛的问题,方法是在第一个比赛结束后才安排下一个比赛。

这个问题经常出现,通常是在轮询一些不可观察的数据源时。当我遇到它时,我使用了我前一段时间写的 RepeatAfterDelay 运算符:

public static IObservable<T> RepeatAfterDelay<T>(this IObservable<T> source, TimeSpan delay, IScheduler scheduler)
{
    var repeatSignal = Observable
        .Empty<T>()
        .Delay(delay, scheduler);

    // when source finishes, wait for the specified
    // delay, then repeat.
    return source.Concat(repeatSignal).Repeat();
}

这就是我的使用方式:

// do first set of work immediately, and then every 5 seconds do it again
m_interval = Observable
    .FromAsync(DoWork)
    .RepeatAfterDelay(TimeSpan.FromSeconds(5), scheduler)
    .Subscribe();

// wait 5 seconds, then do first set of work, then again every 5 seconds
m_interval = Observable
    .Timer(TimeSpan.FromSeconds(5), scheduler)
    .SelectMany(_ => Observable
        .FromAsync(DoWork)
        .RepeatAfterDelay(TimeSpan.FromSeconds(5), scheduler))
    .Subscribe();

虽然 @Brandon 的解决方案很好很干净,但我发现它阻塞了一个线程来等待延迟计时器。非阻塞替代方案可能类似于:

public static IObservable<T> DelayRepeat<T>(this IObservable<T> source, TimeSpan delay) =>
    source
        .Concat(
            Observable.Create<T>(async observer =>
            {
                await Task.Delay(delay);
                observer.OnCompleted();
            }))
        .Repeat();